저렴하게 실시간 분석 아키텍처 만들기 with aws, terraform
서비스를 운영하며 수집되는 데이터는 방대하고 이러한 데이터들 속에서 어떻게 인사이트를 얻어서 서비스를 발전시킬 수 있을까?? 항상 진행하고 싶은 업무 중의 하나이다. 인사이트를 얻기 위해선 데이터를 분석할 수 있는 아키텍처가 필요하다.
이러한 아키텍처에서 요구되는 기능은 보통 아래와 같다.
1. 데이터 수집
2. 데이터 처리 및 변환
3. 변환 데이터 저장
4. 데이터 분석
여기서 해당 기능들을 사용할 수 있는 다양한 서비스들을 사용할 수 있다. 하지만 나는 최대한 저렴하게 이용할 수 있는 아키텍처를 구성해보자.
아키텍처
가격은 저렴하고, 확장성 있게 만들어보려고 한다. 그래서 나는 아래와 같이 선택했다.
1. 데이터 수집 -> Kinesis Data Firehose PUT API
2. 데이터 처리 및 변환 -> Kinesis DataFirehose & Lambda
3. 변환 데이터 저장 -> S3
4. 데이터 분석 -> Athena
애플리케이션에서 Kinesis DataFirehose의 PUT API를 이용해서 KinesisData Firehose로 보내고 적정한 크기의 버퍼를 설정하고 Lambda 데이터를 변환하여 S3로 보낸다.
S3에서 형식에 맞게 저장된 데이터를 Athena를 통해서 쿼리를 보낼 수 있다. 이것을 Amazon QuickSight와 연동해서 시각화를 할 수 있다. 아니면 Grafana를 따로 구현해서 할 수 도 있다.
전체적인 아키텍처는 아래와 같다.

요즘 rust를 공부하고 있어서 rust 코드로 간단하게 아래와 같이 구현해 봤다.
use std::env; use aws_config::default_provider::region::DefaultRegionChain; use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_sdk_firehose::{Client, Error}; use aws_sdk_firehose::model::Record; use aws_sdk_firehose::types::Blob; use serde::{Deserialize, Serialize}; use std::process; #[derive(Serialize, Deserialize, Debug)] struct Data { name: String, age: i32, } #[tokio::main] async fn main() -> Result<(), Error> { let profile = env::args().skip(1).next().expect("profile to use is required"); let region = DefaultRegionChain::builder().profile_name(&profile).build().region().await; let creds = DefaultCredentialsChain::builder() .profile_name(&profile) .region(region.clone()) .build() .await; let config = aws_config::from_env().credentials_provider(creds).region(region).load().await; let client = Client::new(&config); // Define the data to send let data = Data { name: "test".to_string(), age: 10, }; // Serialize the data to JSON let data_json = serde_json::to_string(&data).expect("Serialization failed"); // Define the delivery stream name let firehose = "data-analytics"; // Create a PutRecordInput request let record = Record::builder() .data(Blob::new(data_json.into_bytes())) .build(); let request = client .put_record() .delivery_stream_name(firehose) .record(record); // Send the record to Firehose match request.send().await { Ok(_) => println!("Record submitted successfully!"), Err(e) => { eprintln!("Failed to submit record: {}", e); process::exit(1); } } Ok(()) }
aws-config = "0.13.0" aws-sdk-firehose = "0.13.0" tokio = { version = "1", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0"
디펜던시는 아래와 같다.
그럼 이제 Terraform으로 만들어야 할 것은 아래와 같다.
1. S3
2. Kinesis Data Firehose
3. Lambda
4. Athena
이다. terraform 코드는 여기 Github에서 볼 수 있다.
Lambda에서 쓰이는 코드는 아래와 같다.
use lambda_runtime::{service_fn, Error, LambdaEvent}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use base64::decode; #[derive(Deserialize)] struct Record { recordId: String, data: String, } #[derive(Deserialize, Debug)] // Debug 트레이트 추가 struct Data { age: u32, } #[derive(Serialize)] struct OutputRecord { recordId: String, result: String, data: String, metadata: Metadata, } #[derive(Serialize)] struct Metadata { partitionKeys: PartitionKeys, // 필드 이름 수정 } #[derive(Serialize)] struct PartitionKeys { age: String, } #[tokio::main] async fn main() -> Result<(), Error> { lambda_runtime::run(service_fn(handler)).await?; Ok(()) } async fn handler(event: LambdaEvent<Value>) -> Result<Value, Error> { let (event, _context) = event.into_parts(); let records = event["records"].as_array().unwrap(); let mut output = Vec::new(); for record in records { let record_id = record["recordId"].as_str().unwrap().to_string(); let data_base64 = record["data"].as_str().unwrap(); let decoded_data = decode(data_base64).unwrap(); let payload: Data = serde_json::from_slice(&decoded_data).unwrap(); let partition_keys = PartitionKeys { age: payload.age.to_string(), }; let output_record = OutputRecord { recordId: record_id, result: "Ok".to_string(), data: data_base64.to_string(), metadata: Metadata { partitionKeys: partition_keys }, // 올바르게 수정 }; println!("{:?}", payload); // Debug이므로 이제 오류 없음 output.push(output_record); } Ok(json!({ "records": output })) }
Dockerfile은 이거다.
FROM rust:1.70 as builder WORKDIR /usr/src/app COPY Cargo.toml Cargo.lock ./ RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo fetch COPY src ./src RUN cargo build --release # 실행 스테이지 FROM public.ecr.aws/lambda/provided:al2023-arm64 COPY --from=builder /usr/src/app/target/release/docker ${LAMBDA_RUNTIME_DIR}/bootstrap CMD ["bootstrap"]
athena에서 테이블을 생성할 때 쓰이는 쿼리는 아래와 같다.
CREATE EXTERNAL TABLE IF NOT EXISTS `data_analytics`.`person` (`name` string) PARTITIONED BY ( `year` int, `month` int, `day` int, `age` int ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'ignore.malformed.json' = 'FALSE', 'dots.in.keys' = 'FALSE', 'case.insensitive' = 'TRUE', 'mapping' = 'TRUE' ) STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://data-analytics-dkim/' TBLPROPERTIES ('classification' = 'json');
이제 테스트해보자.
아테나는 테이블을 만들고 파티션은 로드해줘야 정상 동작한다. 쿼리는 아래와 같다.
MSCK REPAIR TABLE `person`;
이제 kinesis firehose로 데이터를 삽입하는 코드를 실행시키자. 나는 rust로 작성했지만, 다른 코드로 해도 무방하다.
kinesis -> lambda -> s3로 동작하여 아래와 같이 데이터가 쌓이게 된다.

이제 아테나에 쿼리를 실행할 수 있고, 아래와 같이 조회된다.

다른 데이터 분석 아키텍처도 존재하지만 내가 가장 저렴하다고 한 이유는 모두 사용한 만큼만 비용을 지불하기 때문이다.
현재는 Lambda 코드에서 별도의 데이터 변환하지 않았지만, 데이터를 가공할 수 있어 유연하게 변경 가능하다.
하지만 문제점도 있다.
1. 이전에 쌓지 못한 데이터는 Glue를 이용해서 S3에 적재해줘야 함.
2. API가 생기거나 분석할 데이터가 생길 때마다 firehose 혹은 lambda 코드를 변경해야 함
물론 가격과 편리함의 트레이드오프가 존재하는 거 같다.
시각화 도구는 오픈 소스를 사용해도 되고, Quicksight를 사용해도 된다.
'Aws' 카테고리의 다른 글
AWS Private Subnet S3 Data Transfer 비용 (2) | 2024.11.05 |
---|---|
Aws Lambda 파일 시스템의 필요성 (0) | 2024.10.24 |
AWS S3 LifeCycle with Terraform (0) | 2024.09.28 |
AWS Security Specialty (SCS-C02) 취득 후기 (1) | 2024.01.23 |
AWS CloudFront 정적 + 동적 컨텐츠 캐시 활용 - 1 (1) | 2023.11.30 |
댓글
이 글 공유하기
다른 글
-
AWS Private Subnet S3 Data Transfer 비용
AWS Private Subnet S3 Data Transfer 비용
2024.11.05기존의 Private Subnet에서 동작하는 EC2와 Lambda 리소스들은 프로세싱을 작업을 위해서 S3에서 객체를 다운로드하고, 후처리 작업을 한다. 나는 이때 Compute Resource인 EC2와 Lambda에는 Data Transfer(이하 DT) 비용이 발생하지 않는 것으로 알고 있었다. 하지만 내 생각은 틀렸고 개선할 부분을 찾을 수 있었다. 우선 기존 아키텍처는 아래와 같다. 요청의 흐름은 다음과 같다. 1. Compute Resource -> Nat Gateway2. Nat Gateway -> Internet gateway3. Internet Gateway -> Internet4. Internet -> S35. 역순 이때 나는 S3는 같은 리전의 서비스가 접근할 때는 DT가 발생하지… -
Aws Lambda 파일 시스템의 필요성
Aws Lambda 파일 시스템의 필요성
2024.10.24업무를 하다보니 파일을 읽어들이고 데이터를 생성하는 Lambda가 필요해졌다. S3에서 파일을 가져오는 방식으로 설계했지만 실패하고 결국 파일 시스템을 사용하게 됐다. 그 과정에서 생긴 이슈와 내가 놓친 부분들을 보려고 한다. Lambda와 S3 네트워크 나는 vpc endpoint gateway 유형을 사용하고 있었기에 Lambda에서 S3를 호출할 경우 Public 네트워크가 아닌 내부 네트워크를 이용하게 된다. 나는 이점에서 빠른 데이터를 다운로드 받을 수 있다고 생각했다. 하지만 난 틀렸다. 실제로 S3는 100Gbps를 지원한다. 내가 간과한 것은 Lambda의 네트워크 대역폭이었다. 실제로 AWS 측에서 세부적으로 밝힌 것은 없지만, 커뮤니티들을 살펴봤을 때 Lambda의 메모리를 올렸을 때 … -
AWS S3 LifeCycle with Terraform
AWS S3 LifeCycle with Terraform
2024.09.28입사하고 빌링을 봤을 때 가장 많은 비용을 차지하는 게 S3 저장 비용이었다. 라이프 싸이클의 옵션을 알고 있었고, 비용 절감 가능성에 대해서 알고 있었다. 하지만 문제는 비즈니스의 이해였다. 객체들의 액세스 패턴을 알지 못했기에 섣불리 적용할 수 없었다. 비즈니스를 파악하고, 적용하며 얻었던 지식이다. S3 라이프 사이클 aws s3의 라이프 사이클 옵션은 aws s3 lifecycle docs에서 확인할 수 있고, 중요점만 살펴보자. 기본적으로 s3를 만들고 객체를 생성하면 Standard 객체 클래스로 생성된다. 범용성 있게 사용할 수 있는 옵션이지만 저장 가격은 높다고 볼 수 있다. (ap-northeast-2 seoul 기준 GB당 0.025USD) 그렇기에 객체 액세스 패턴을 살펴보고 가격… -
AWS Security Specialty (SCS-C02) 취득 후기
AWS Security Specialty (SCS-C02) 취득 후기
2024.01.23DOP-C02를 합격하니, 50퍼센트 할인권을 받았다. 가장 높은 단계인 Specialty 단계를 목표로 했고, 사내에서 데이터 관련 프로젝트를 진행하는 게 있어서 그런지 Security가 매력적으로 느껴졌다. 한 달을 준비후 시험에 응시했다. 준비 과정 1. 인강 수강 2. 실습해보기 3. 덤프 풀기 인강을 수강하는 건 언제나 옳았고, 덤프를 맹신하지 않는 게 좋다고 생각한다. 준비 과정이 저번 자격증을 응시했을 때와 다른 점은 혼자 실습해 봤다. 강의를 듣다 보면 하나의 서비스에 대해서 소개하고, Organization에서 어떻게 동작하는지 혹은 활용하는지에 대해서 많이 나온다. SAA와 DOP를 따며, AWS 서비스의 전반적인 틀은 알고 있었지만, Organization은 실제로 사용해 본 적이 없…
댓글을 사용할 수 없습니다.