AWS Lambda 직접 구현하기 - 2 (요청 애플리케이션 생성)
이제 구현을 할 차례인데 나는 SpringBoot 백엔드 개발자 경력이 있기에 Web Application으로는 SpringBoot로 구축할 예정이다.
우선 아키텍처는 다음과 같다.
여기서 ALB와 연결된 Wab Application과 FireCracker에서 실행되는 Web Application이 2개가 있다. 이제 A, B로 칭하겠다. 요구사항을 한 번 정리해 보자.
A Web Application 요구 사항
1. 사용자가 요청을 보냈을 때 DynamoDB를 보고 리소스 여유가 있는 FireCracker EC2로 요청을 보내고 반환
2. Warm Up 상태인 경우에 microVM을 다시 생성하지 않고, Warm Up 상태인 곳을 보내기.
A Web Application(이하 WA)은 DynamoDB를 보고, 요청을 실행할 수 있는 B WP로 보내는 역할을 하는 게 주요 역할이다.
B Web Application 요구 사항
1. 요청이 왔을 때 DynamoDB의 리소스 사용량 기입 후 Firecracker Socket을 활용해서 MicroVM 실행하고, 요청 처리
2. Warm Up이 있을 경우 Warm Up에서 요청 처리
B WP는 A에서 온 요청을 기반으로 MicroVM을 실행하고, MicroVM의 생명 주기를 관리하는 역할을 하게 된다.
A WP는 구현 난이도가 쉬울 것으로 예상되는데, Firecracker 설정과 B WP에서 MicroVM을 생성하는 요청이 어려울 것으로 예상된다.
A WP 구현
A WP 먼저 구현을 하는데 요구 사항이 단 하나이기에 간단할 거 같았지만, 동시성 때문에 어려웠다. 일단 로직 먼저 살펴보자. 모든 코드는 해당 Github에 있다.
사용자가 요청을 보냈을 때 DynamoDB를 보고 리소스 여유가 있는 FireCracker EC2로 요청을 보내고 반환
1. 리소스 할당
간단하게 사용자에게 요청받는 것은 아래와 같다.
@Getter
public class StartVMRequest {
private Integer requestMemory;
private String architect;
private String language;
private String arn;
private String bucketName;
private String filePath;
private String env;
}
해당 firecracker에게 모두 넘겨줘야 하고, requestMemory를 수용할 수 있는 인스턴스가 있는지 DynamoDB 값을 A WP에서 비교해서 확인하는 로직이 필요했다.
## Service
private FireCrackerInfo getAvailableFireCrackerInstance(ResourceRequest resourceRequest) {
return fireCrackerRepository.findByRemainResourceIp(resourceRequest.getRequestMemory())
.orElseThrow(() -> new RuntimeException("No available FireCracker instance found."));
}
## Repository
public Optional<FireCrackerInfo> findByRemainResourceIp(Integer requestMemory) {
return fireCrackerInfoDynamoDbTable.scan(ScanEnhancedRequest.builder().build())
.items()
.stream()
.filter(item -> item.getRemainMemory() >= requestMemory)
.min(Comparator.comparing(FireCrackerInfo::getRemainMemory));
}
Service 쪽에서 Repository 쪽으로 필요한 용량을 보내고, Repository에서는 용량을 수용할 수 있는 firecracker 정보를 넘겨준다.
여기서 가용 가능한 firecracker가 여러 개 있을 수도 있는데, 나는 가장 여유가 없는 firecracker로 할당하기로 정책을 정했다. 이유는 용량이 큰 요청이 왔을 때 모두가 수용할 수 없을 수도 있는 상황을 고려했다.
그다음 점유했다는 것을 표현하기 위해서 DynamoDB의 값을 업데이트 해야 한다. 로직 자체는 아래와 같다. 정말 간단한 로직들만 있지만, 사실 동시성을 고려해야 한다. 여러 사용자가 동시에 요청하기에 DynamoDB의 동시성을 애플리케이션 단에서 구현해야한다. 그래서 나는 versionId 라는 속성을 만들고, 요청하려는 시점보다 미래라면 거절하고 다시 요청하는 방식으로 구현했다.
## Service
public Object processRequest(ResourceRequest resourceRequest) throws InterruptedException {
FireCrackerInfo useInstance;
while (true) {
try {
# 중략
updateRemainingMemory(useInstance, resourceRequest);
break;
} catch (ConditionalCheckFailedException e){
logger.warn("Version modified, retrying updateRemainingMemory.", e);
}
}
# 중략
}
## Repository
public void updateRemainMemoryIfVersionFuture(FireCrackerInfo useInstance, long versionId, ResourceRequest resourceRequest) {
fireCrackerInfoDynamoDbTable.updateItem(buildUpdateRequestWithCondition(useInstance, versionId, resourceRequest.getRequestMemory()));
}
private UpdateItemEnhancedRequest<FireCrackerInfo> buildUpdateRequestWithCondition(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return UpdateItemEnhancedRequest.builder(FireCrackerInfo.class)
.item(buildRequestUpdatedItem(fireCrackerInfo, versionId, requestMemory))
.conditionExpression(buildConditionExpression(versionId))
.build();
}
private FireCrackerInfo buildRequestUpdatedItem(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return FireCrackerInfo.builder()
.ip(fireCrackerInfo.getIp())
.remainMemory(fireCrackerInfo.getRemainMemory() - requestMemory)
.versionId(versionId)
.build();
}
private Expression buildConditionExpression(long versionId) {
return Expression.builder()
.expression("versionId <= :newVersionId")
.putExpressionValue(":newVersionId", AttributeValue.builder().n(Long.toString(versionId)).build())
.build();
}
이렇게 리소스를 점유하는 로직까지는 구성했고, 요청을 보내서 microVM을 생성하는 B WP는 다음 글에서 구현하겠다. 요청을 보내는 것은 아래와 같다.
public Object processRequest(ResourceRequest resourceRequest) throws InterruptedException {
# 중략
return requestFireCracker(useInstance, resourceRequest).getData();
}
private ResourceResponse requestFireCracker(final FireCrackerInfo usedInstance, final ResourceRequest resourceRequest) {
ResponseEntity<ResourceResponse> responseEntity = restTemplate.postForEntity(buildRequestURI(usedInstance.getIp()), resourceRequest, ResourceResponse.class);
HttpStatusCode statusCode = responseEntity.getStatusCode();
releaseResource(usedInstance, resourceRequest, LocalDateTime.now().toEpochSecond(ZoneOffset.UTC));
if (statusCode.is2xxSuccessful()) {
return responseEntity.getBody();
} else {
throw new FailedFireCracker(responseEntity.getBody().getData());
}
}
private URI buildRequestURI(String ip) {
try {
return new URI("http", null, ip, -1, "/request", null, null);
} catch (URISyntaxException e) {
// 예외 처리
throw new IllegalArgumentException("Invalid IP address: " + ip, e);
}
}
요청을 보내고 성공했던, 오류가 발생했던 리소스를 해제시키는 작업이 필요하다. 따라서 releaseResource 로직을 넣었고 이것도 마찬가지로 동시성을 고려해서 versionId 비교를 해야한다.
## Service
private void releaseResource(FireCrackerInfo useInstance, ResourceRequest resourceRequest, long now) {
try {
fireCrackerRepository.releaseResource(useInstance, resourceRequest, now);
} catch (ConditionalCheckFailedException e) {
logger.warn("Version modified, retrying releaseResource.", e);
releaseResource(useInstance, resourceRequest, now);
}
}
## Repository
public void releaseResource(FireCrackerInfo usedInstance, ResourceRequest resourceRequest, long versionId) {
fireCrackerInfoDynamoDbTable.updateItem(buildReleaseRequestWithCondition(usedInstance, versionId, resourceRequest.getRequestMemory()));
}
private UpdateItemEnhancedRequest<FireCrackerInfo> buildReleaseRequestWithCondition(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return UpdateItemEnhancedRequest.builder(FireCrackerInfo.class)
.item(buildReleaseUpdatedItem(fireCrackerInfo, versionId, requestMemory))
.conditionExpression(buildConditionExpression(versionId))
.build();
}
private FireCrackerInfo buildReleaseUpdatedItem(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return FireCrackerInfo.builder()
.ip(fireCrackerInfo.getIp())
.remainMemory(fireCrackerInfo.getRemainMemory() + requestMemory)
.versionId(versionId)
.build();
}
private Expression buildConditionExpression(long versionId) {
return Expression.builder()
.expression("versionId <= :newVersionId")
.putExpressionValue(":newVersionId", AttributeValue.builder().n(Long.toString(versionId)).build())
.build();
}
이렇게 하면 A WP는 구현이 완료된다. 물론 B WP를 만들며, 로직이 수정될 수도 있다. 완성된 Service와 Repository는 아래와 같다.
Service
package request_application.request_instance.dynamo.service;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import request_application.request_instance.dynamo.controller.dto.ResourceResponse;
import request_application.request_instance.dynamo.controller.dto.ResourceRequest;
import request_application.request_instance.dynamo.exception.FailedFireCracker;
import request_application.request_instance.dynamo.repository.FireCrackerRepository;
import request_application.request_instance.dynamo.table.FireCrackerInfo;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import static java.lang.Thread.sleep;
@Service
@RequiredArgsConstructor
public class FireCrackerService {
private static final Logger logger = LoggerFactory.getLogger(FireCrackerService.class);
private final FireCrackerRepository fireCrackerRepository;
private final RestTemplate restTemplate;
public Object processRequest(ResourceRequest resourceRequest) throws InterruptedException {
FireCrackerInfo useInstance;
while (true) {
try {
useInstance = getAvailableFireCrackerInstance(resourceRequest);
updateRemainingMemory(useInstance, resourceRequest);
break;
} catch (ConditionalCheckFailedException e){
logger.warn("Version modified, retrying updateRemainingMemory.", e);
}
}
sleep(2000);
return requestFireCracker(useInstance, resourceRequest).getData();
}
private FireCrackerInfo getAvailableFireCrackerInstance(ResourceRequest resourceRequest) {
return fireCrackerRepository.findByRemainResourceIp(resourceRequest.getRequestMemory())
.orElseThrow(() -> new RuntimeException("No available FireCracker instance found."));
}
private void updateRemainingMemory(FireCrackerInfo useInstance, ResourceRequest resourceRequest) {
fireCrackerRepository.updateRemainMemoryIfVersionFuture(useInstance, LocalDateTime.now().toEpochSecond(ZoneOffset.UTC), resourceRequest);
}
private void releaseResource(FireCrackerInfo useInstance, ResourceRequest resourceRequest, long now) {
try {
fireCrackerRepository.releaseResource(useInstance, resourceRequest, now);
} catch (ConditionalCheckFailedException e) {
logger.warn("Version modified, retrying releaseResource.", e);
releaseResource(useInstance, resourceRequest, now);
}
}
private ResourceResponse requestFireCracker(final FireCrackerInfo usedInstance, final ResourceRequest resourceRequest) {
ResponseEntity<ResourceResponse> responseEntity = restTemplate.postForEntity(buildRequestURI(usedInstance.getIp()), resourceRequest, ResourceResponse.class);
HttpStatusCode statusCode = responseEntity.getStatusCode();
releaseResource(usedInstance, resourceRequest, LocalDateTime.now().toEpochSecond(ZoneOffset.UTC));
if (statusCode.is2xxSuccessful()) {
return responseEntity.getBody();
} else {
throw new FailedFireCracker(responseEntity.getBody().getData());
}
}
private URI buildRequestURI(String ip) {
try {
return new URI("http", null, ip, -1, "/request", null, null);
} catch (URISyntaxException e) {
// 예외 처리
throw new IllegalArgumentException("Invalid IP address: " + ip, e);
}
}
}
Repository
package request_application.request_instance.dynamo.repository;
import org.springframework.stereotype.Component;
import request_application.request_instance.dynamo.controller.dto.ResourceRequest;
import request_application.request_instance.dynamo.table.FireCrackerInfo;
import software.amazon.awssdk.enhanced.dynamodb.*;
import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedRequest;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.Comparator;
import java.util.Optional;
@Component
public class FireCrackerRepository {
private final DynamoDbTable<FireCrackerInfo> fireCrackerInfoDynamoDbTable;
public FireCrackerRepository(DynamoDbEnhancedClient dynamoDbEnhancedClient) {
this.fireCrackerInfoDynamoDbTable = dynamoDbEnhancedClient
.table("firecracker", TableSchema.fromImmutableClass(FireCrackerInfo.class));
}
public void updateRemainMemoryIfVersionFuture(FireCrackerInfo useInstance, long versionId, ResourceRequest resourceRequest) {
fireCrackerInfoDynamoDbTable.updateItem(buildUpdateRequestWithCondition(useInstance, versionId, resourceRequest.getRequestMemory()));
}
public void releaseResource(FireCrackerInfo usedInstance, ResourceRequest resourceRequest, long versionId) {
fireCrackerInfoDynamoDbTable.updateItem(buildReleaseRequestWithCondition(usedInstance, versionId, resourceRequest.getRequestMemory()));
}
public Optional<FireCrackerInfo> findByRemainResourceIp(Integer requestMemory) {
return fireCrackerInfoDynamoDbTable.scan(ScanEnhancedRequest.builder().build())
.items()
.stream()
.filter(item -> item.getRemainMemory() >= requestMemory)
.min(Comparator.comparing(FireCrackerInfo::getRemainMemory));
}
private UpdateItemEnhancedRequest<FireCrackerInfo> buildUpdateRequestWithCondition(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return UpdateItemEnhancedRequest.builder(FireCrackerInfo.class)
.item(buildRequestUpdatedItem(fireCrackerInfo, versionId, requestMemory))
.conditionExpression(buildConditionExpression(versionId))
.build();
}
private FireCrackerInfo buildRequestUpdatedItem(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return FireCrackerInfo.builder()
.ip(fireCrackerInfo.getIp())
.remainMemory(fireCrackerInfo.getRemainMemory() - requestMemory)
.versionId(versionId)
.build();
}
private UpdateItemEnhancedRequest<FireCrackerInfo> buildReleaseRequestWithCondition(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return UpdateItemEnhancedRequest.builder(FireCrackerInfo.class)
.item(buildReleaseUpdatedItem(fireCrackerInfo, versionId, requestMemory))
.conditionExpression(buildConditionExpression(versionId))
.build();
}
private FireCrackerInfo buildReleaseUpdatedItem(FireCrackerInfo fireCrackerInfo, long versionId, int requestMemory) {
return FireCrackerInfo.builder()
.ip(fireCrackerInfo.getIp())
.remainMemory(fireCrackerInfo.getRemainMemory() + requestMemory)
.versionId(versionId)
.build();
}
private Expression buildConditionExpression(long versionId) {
return Expression.builder()
.expression("versionId <= :newVersionId")
.putExpressionValue(":newVersionId", AttributeValue.builder().n(Long.toString(versionId)).build())
.build();
}
}
'기타' 카테고리의 다른 글
AWS Lambda 직접 구현하기 - 4 (Firecracker Java 애플리케이션으로 실행) (0) | 2024.12.25 |
---|---|
AWS Lambda 직접 구현하기 - 3 (Firecracker 실행) (1) | 2024.12.14 |
AWS Lambda 직접 구현하기 - 1 (시작) (0) | 2024.11.27 |
Backend -> DevOps 직무 전환 4개월 차 회고 (2) | 2024.09.28 |
첫 퇴사 회고 (0) | 2024.05.03 |
댓글
이 글 공유하기
다른 글
-
AWS Lambda 직접 구현하기 - 4 (Firecracker Java 애플리케이션으로 실행)
AWS Lambda 직접 구현하기 - 4 (Firecracker Java 애플리케이션으로 실행)
2024.12.25 -
AWS Lambda 직접 구현하기 - 3 (Firecracker 실행)
AWS Lambda 직접 구현하기 - 3 (Firecracker 실행)
2024.12.14 -
AWS Lambda 직접 구현하기 - 1 (시작)
AWS Lambda 직접 구현하기 - 1 (시작)
2024.11.27 -
Backend -> DevOps 직무 전환 4개월 차 회고
Backend -> DevOps 직무 전환 4개월 차 회고
2024.09.28