본 내용은 spring.io/guides/gs/async-method/ 를 참고하여 작성한다.
CompletableFuture가 어떻게 동작하는지 확인한다.
코드가 동작하는 시나리오는 https://api.github.com/users/{user} API를 호출하여 그 결과를 보여주는 것이다.
동시에 3개의 API를 호출하게 될 경우 sync로 받게 될 경우 1개의 API 당 1초가 걸리게 될 경우 3초 + Alpha의 시간이 걸릴 것인데 결과는 어떻게 나오게 될지 확인해 본다.
먼저 값을 담아 올 User 객체를 만든다. 참고로 Api Response는 다음과 같다.
{
"login": "pivotalsoftware",
"id": 4247270,
"node_id": "MDEyOk9yZ2FuaXphdGlvbjQyNDcyNzA=",
"avatar_url": "https://avatars0.githubusercontent.com/u/4247270?v=4",
"gravatar_id": "",
"url": "https://api.github.com/users/pivotalsoftware",
"html_url": "https://github.com/pivotalsoftware",
"followers_url": "https://api.github.com/users/pivotalsoftware/followers",
"following_url": "https://api.github.com/users/pivotalsoftware/following{/other_user}",
"gists_url": "https://api.github.com/users/pivotalsoftware/gists{/gist_id}",
"starred_url": "https://api.github.com/users/pivotalsoftware/starred{/owner}{/repo}",
"subscriptions_url": "https://api.github.com/users/pivotalsoftware/subscriptions",
"organizations_url": "https://api.github.com/users/pivotalsoftware/orgs",
"repos_url": "https://api.github.com/users/pivotalsoftware/repos",
"events_url": "https://api.github.com/users/pivotalsoftware/events{/privacy}",
"received_events_url": "https://api.github.com/users/pivotalsoftware/received_events",
"type": "Organization",
"site_admin": false,
"name": "Pivotal Software, Inc.",
"company": null,
"blog": "http://pivotal.io",
"location": null,
"email": null,
"hireable": null,
"bio": null,
"twitter_username": null,
"public_repos": 35,
"public_gists": 0,
"followers": 0,
"following": 0,
"created_at": "2013-04-24T16:14:01Z",
"updated_at": "2019-06-04T23:42:37Z"
}
이 중에서 name 값과 blog 값만 Java POJO 객체에 담을 수 있도록 한다.
@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
private String name;
private String blog;
@Override
public String toString() {
return "User [name=" + name + ", blog=" + blog + "]";
}
}
실제로 API 통신을 해서 가져올 Service를 만든다.
@Service
@Slf4j
public class GitHubLookupService {
private final RestTemplate restTemplate;
public GitHubLookupService(RestTemplateBuilder restTemplateBuilder) {
this.restTemplate = restTemplateBuilder.build();
}
@Async
public CompletableFuture<User> findUser(String user) throws InterruptedException {
log.debug("Looking up : " + user);
String url = String.format("https://api.github.com/users/%s", user);
User results = restTemplate.getForObject(url, User.class);
// Artificial delay of 1s for demonstration purposes
Thread.sleep(1000L);
return CompletableFuture.completedFuture(results);
}
}
findUser Method는 @Async annotation을 사용했으며 User 객체가 아닌 CompletableFuture로 감싼 User 객체를 리턴한다.
이렇게 될 경우 API 응답이 느리게 오더라도 먼저 return 하게 되고 실제 값이 리턴될 경우 다시 한번 값을 받게 된다.
Async로 실행하게 될 Thread와 관련된 설정은 다음과 같이 해 준다.
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("GithubLookup-");
executor.initialize();
return executor;
}
}
Springboot main 클래스에 @EnableAsync annotation을 달고 ThreadPoolTaskExecutor를 셋팅해서 반환하는 Bean을 만든다.
기본적으로 pool size를 2개로 하게 될 경우 2개를 먼저 처리 후 나머지 1개를 처리하게 될 것이다.
이제 이를 실행하기 위한 클래스를 만든다.
@Slf4j
@Component
public class AppRunner implements CommandLineRunner {
private final GitHubLookupService gitHubLookupService;
public AppRunner(GitHubLookupService gitHubLookupService) {
this.gitHubLookupService = gitHubLookupService;
}
@Override
public void run(String... args) throws Exception {
// Start the clock
long start = System.currentTimeMillis();
// Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("PivotalSoftware");
CompletableFuture<User> page2 = gitHubLookupService.findUser("CloudFoundry");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Spring-Projects");
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
// Print results, including elapsed time
log.info("Elapsed time: " + (System.currentTimeMillis() - start));
log.info("--> " + page1.get());
log.info("--> " + page2.get());
log.info("--> " + page3.get());
}
}
gitHubLookupService 3개를 차례대로 호출하고 값을 모아서 출력하게 된다.
실행한 시간과 결과값을 보여준다.
실행은 다음과 같이 하면 된다. 이렇게 실행하게 되면 excutable jar를 실행하는 것과 똑같이 실행된다.
실행을 하게 되면 로그에 다음과 같이 보여지게 된다.
호출은 2 > 1번 순으로 호출 되었지만 결과는 1 > 2번 순으로 오게 되었고, 처리 되지 않았던 마지막 Spring-Projects 는 1번 쓰레드에 의해 다시 호출되게 된다.
각 호출은 1초의 Sleep을 주었기 때문에 Sync로 동작하게 되면 최소 3초 이상이 걸려야 하지만 실제 실행 시간은 2,378ms 가 걸리게 되었다.
그렇다면 pool size를 3으로 변경하면 실행 시간은 어떻게 바뀌게 될까?
pool에 여유가 있어서 3 > 2 > 1 순서로 실행 되었고 결과는 2 > 3 > 1 로 오게 되었다.
각 호출 당 1초 이상의 실행 시간이 필요 하기 때문에 한번에 실행 되어 결과가 모여 졌으므로 실행 시간도 1,592ms 로 짧아 졌다.
간단하게 CompletableFuture를 사용하여 Asynchronous method의 동작 모습을 볼 수 있었다.
AppRunner의 "CompletableFuture.allOf(page1,page2,page3).join();" 에서 볼 수 있는 모습은 마치 Parallel Stream의 fork and join과 비슷하게 동작하는 것을 볼 수 있다.
추가
taskExecutor를 여러개 만들고 나서 특정 taskExecutor를 쓰고 싶을 경우 아래와 같이 하면 된다.
private final GitHubLookupService gitHubLookupService;
private final Executor executor;
public AppRunner(GitHubLookupService gitHubLookupService, @Qualifier("taskExecutor") Executor executor) {
this.gitHubLookupService = gitHubLookupService;
this.executor = executor;
}
@Override
public void run(String... args) throws Exception {
// Start the clock
long start = System.currentTimeMillis();
CompletableFuture.runAsync(() -> {
try {
// Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("PivotalSoftware");
CompletableFuture<User> page2 = gitHubLookupService.findUser("CloudFoundry");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Spring-Projects");
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
// Print results, including elapsed time
log.info("Elapsed time: " + (System.currentTimeMillis() - start));
log.info("--> " + page1.get());
log.info("--> " + page2.get());
log.info("--> " + page3.get());
} catch (Exception e) {
e.printStackTrace();
}
}, executor);
}
CompletableFuture.runAsync에 2번째 인자로 bean을 넣어주면 된다.
단 동작은 조금 달라지는데 runAsync 안에서 각자 async로 돌아가는 것이라서 3개의 service가 동시에 실행 되기 위해서는 pool이 4개가 있어야 한다. 3개의 pool로 설정할 경우에는 runAync 안에서 1개의 쓰레드를 사용하는 것 같으며 나머지 2개로 gitHubLookupService를 실행하게 된다.
추가2
모든 API의 호출이 완료되어서 join 하기 전에 미리 page1, 2, 3를 보게 된다면 어떨까?
다음과 같이 로그를 찍어봤다.
log.info("--> " + page1);
log.info("--> " + page2);
log.info("--> " + page3);
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
위 로그 이미지에서 볼 수 있는 것 처럼 CompletableFuture 객체가 아직 완료 되지 않은 상태로 반환 되었으며, 호출은 추후 이루어지는 것을 볼 수 있다.
Async로 잘 동작하는 것을 알 수 있다.
'Java > Spring' 카테고리의 다른 글
명시적인 instance-id 셋팅으로 instance 샤딩하기 (0) | 2021.01.25 |
---|---|
SQS와 SSE를 이용한 Proxy 서버 만들기 (0) | 2021.01.22 |
API 호출 테스트를 위한 간단한 팁 (0) | 2020.12.21 |
AttributeConverter class registered multiple times 에러가 발생할 경우 (0) | 2020.12.16 |
테스트 코드 작성 시 willReturn 값이 안나오는 경우 (0) | 2020.12.02 |