5주차. 데이터 전처리, csv 파일 읽어 데이터 저장
💡 데이터 저장
AI 담당 팀원으로 부터 전처리된 데이터를 csv 파일 형식으로 받았다.
< 데이터 >
- [1분 단위] 아파트 동별 소비전력 전력 분배 데이터
- [10분 단위] 아파트 층별 소비전력 데이터
해당 데이터를 DB에 저장하기 위해 먼저, csv 파일을 읽는 코드를 짰다.
< Service >
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Service;
@Service
public class CsvReadService {
public List<Map<String, Object>> readCsv(String filePath){
// 추후 return 할 데이터 목록
List<Map<String, Object>> mapList = new ArrayList<Map<String, Object>>();
// return data key 목록
List<String> headerList = new ArrayList<String>();
try{
BufferedReader br = Files.newBufferedReader(Paths.get(filePath));
String line = "";
while((line = br.readLine()) != null){
List<String> stringList = new ArrayList<String>();
String stringArray[] = line.split(",");
stringList = Arrays.asList(stringArray);
// csv 1열 데이터를 header로 인식
if(headerList.size() == 0){
headerList = stringList;
} else {
Map<String, Object> map = new HashMap<String, Object>();
// header 컬럼 개수를 기준으로 데이터 set
for(int i = 0; i < headerList.size(); i++){
map.put(headerList.get(i), stringList.get(i));
}
mapList.add(map);
}
}
} catch (IOException e) {
e.printStackTrace();
}
return mapList;
}
}
위의 서비스를 이용하여 프로그램 실행시 자동으로 해당 데이터가 있는지 판단하고, 없으면 DB에 저장하는 코드를 구현했다.
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
@RequiredArgsConstructor
public class InitEnergyDB {
private final BuildingRepository buildingRepository;
private final BuildingPerTenMinuteRepository buildingPerTenMinuteRepository;
private final BuildingPerMinuteRepository buildingPerMinuteRepository;
CsvReadService csvReadService = new CsvReadService();
@PostConstruct
public void init() {
buildingInit();
log.info("building init completed!");
floorInit();
log.info("Floor init completed");
buildingPerMinuteInit();
log.info("buildingPerMinute init completed");
buildingEnergyPriceInit();
log.info("buildingEnergyPrice init completed");
}
public void buildingInit(){ // 동별 전력 사용량 DB 저장 - 아파트_동별_소비전력_전력분배
if(buildingRepository.findById(10000L).isPresent()){ // 데이터 유무 확인
return;
}
List<Map<String, Object>> maps = csvReadService.readCsv("preprocessed_data/아파트_동별_소비전력_전력분배_2022-07-18~2023-08-30.csv");
for (Map<String, Object> map : maps) {
Building building = new Building(Timestamp.valueOf((String) map.get("TIMESTAMP")), Double.valueOf((String) map.get("561_CONSUMPTION(kW)")), Double.valueOf((String)map.get("562_CONSUMPTION(kW)")), Double.valueOf((String) map.get("563_CONSUMPTION(kW)")), Double.valueOf((String) map.get("561_bus")), Double.valueOf((String)map.get("562_bus")), Double.valueOf((String) map.get("563_bus")));
buildingRepository.save(building);
}
}
public void floorInit(){ // 층별 전력 사용량 DB 저장 - [10분 단위] 아파트_층별_소비전력
if(buildingPerTenMinuteRepository.findById(10000L).isPresent()){
return;
}
List<Map<String, Object>> maps = csvReadService.readCsv("preprocessed_data/[10분 단위]아파트_층별_소비전력_2023-03-16 06.30.00 ~ 2023-08-30 10.30.00.csv");
for (Map<String, Object> map : maps) {
BuildingPerTenMinute buildingPerTenMinute = new BuildingPerTenMinute(Timestamp.valueOf((String) map.get("TIMESTAMP")),Integer.valueOf((String) map.get("BUILDING")), Integer.valueOf((String)map.get("FLOOR")), Integer.valueOf((String) map.get("CONSUMPTION(W)")));
buildingPerTenMinuteRepository.save(buildingPerTenMinute);
}
}
public void buildingPerMinuteInit(){ // [1분 단위] 아파트_동별_소비전력_전력분배
if(buildingPerMinuteRepository.findById(10000L).isPresent()){
return;
}
List<Map<String, Object>> maps = csvReadService.readCsv("preprocessed_data/[1분 단위]아파트_동별_소비전력_전력분배_2022-07-18 00.00.00~2023-08-30 10.39.00.csv");
for (Map<String, Object> map : maps) {
BuildingPerMinute buildingPerMinute = new BuildingPerMinute(Timestamp.valueOf((String) map.get("TIMESTAMP")), Double.valueOf((String) map.get("561_CONSUMPTION(kW)")), Double.valueOf((String)map.get("562_CONSUMPTION(kW)")), Double.valueOf((String) map.get("563_CONSUMPTION(kW)")), Double.valueOf((String) map.get("561_bus")), Double.valueOf((String)map.get("562_bus")), Double.valueOf((String) map.get("563_bus")));
buildingPerMinuteRepository.save(buildingPerMinute);
}
}
}
< 결과 >


코드를 실행시키면, MySQL에 데이터가 잘 저장된 것을 확인할 수 있다.
6주차. SSE를 사용하여 실시간 알림 기능 구현
서버에서 AI 예측치와 실제 전력 사용량이 일정치 이상 차이날 경우, 즉 건물 내의 전력 사용에 대한 비정상적인 패턴이나 이상을 감지하는 경우, 건물 관리자에게 알림을 보내는 서비스를 구현하기 위해 SSE 알림 기능 서비스를 구현했다.
📌 SSE (Server-Sent Event)
Websocket 방식은 양방향 통신을 지원하여 클라이언트에서 서버로, 서버에서 클라이언트로 통신이 가능하다.
반면, SSE의 경우 서버에서 클라이언트로 단방향 통신만 가능하다.
SSE는 Websocke보다 가볍고, Springboot에서 구현이 쉽다는 장점이 있다.
이상치 알림 서비스의 경우, 서버에서 건물 관리자에게 단방향으로 통신하면 되므로, SSE 방식을 선택했다.
💡 SSE 알림 서비스
알림이 생성되면 실시간으로 팝업창 ("ex ) 새로운 알람이 도착하였습니다. 알림함을 확인해주세요") 이 뜬다.
알림함은 읽은 알림과 읽지 않은 알림함으로 구분하여 보여주려 한다.
사용법
- 사용자가 로그인에 성공하면, 자동으로 http://localhost:8080/api/sub 을 호출하여 유저의 토큰을 Authorization에 넣어 SSE 알림 기능을 구독한다.
- 잘 연결되었는지 확인하기 위해, 클라이언트 개발 용으로 http://localhost:8080/api/notification/server 를 POST하면 알림이 생성되고, 알림이 보내진 해당 유저에게 실시간으로 event가 발생한다.
- 새로운 알람을 확인하는 API로 http://localhost:8080/api/notifications/new 를 GET하면, 읽지 않은 알람들이 뜬다.
- 앞의 API인 http://localhost:8080/api/notifications/new를 호출하면 그 안에 담겨 있던 알람들은 자동으로 읽은 알람이 되도록 처리했다.
- 읽은 알람을 확인하는 API는 http://localhost:8080/api/notifications/old 를 GET 방식을 통해 확인할 수 있다.
< Controller >
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@RestController
@Tag(name="notification", description = "알림 기능")
@RequiredArgsConstructor
@RequestMapping("/api")
public class NotificationController {
private final NotificationRepository notificationRepository;
private final NotificationService notificationService;
private final MemberRepository memberRepository;
public static Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
public static final long TIMEOUT = 60L * 1000;
@Operation(method = "get", summary = "새로운 알람을 확인하는 API")
@GetMapping("/notifications/new")
public ResponseEntity<?> getUnreadNotifications(@CurrentUser UserPrincipal user) {
Member member= memberRepository.findById(user.getId()).orElseThrow(() ->
new IllegalArgumentException("유저 정보가 없습니다."));
List<Notification> notifications = notificationRepository.findByMemberAndCheckedOrderByCreatedDesc(member, false);
long numberOfChecked = notificationRepository.countByMemberAndChecked(member, false);
notificationService.markAsRead(notifications); // 확인한 알람으로 보내기
return ResponseEntity.ok(new ReceivedNotificationDto(numberOfChecked, notifications));
}
@Operation(method = "get", summary = "읽었던 알람을 확인하는 API")
@GetMapping("/notifications/old")
public ResponseEntity<?> getReadedNotifications(@CurrentUser UserPrincipal user) {
Member member= memberRepository.findById(user.getId()).orElseThrow(() ->
new IllegalArgumentException("유저 정보가 없습니다."));
List<Notification> notifications = notificationRepository.findByMemberAndCheckedOrderByCreatedDesc(member, true);
Long numberOfNotChecked = notificationRepository.countByMemberAndChecked(member, true);
return ResponseEntity.ok(new ReceivedNotificationDto(numberOfNotChecked, notifications));
}
@Operation(method = "delete", summary = "알림 모두 삭제하는 API")
@DeleteMapping("/notifications")
public ResponseEntity<?> deleteNotifications(@CurrentUser UserPrincipal user) {
Member member= memberRepository.findById(user.getId()).orElseThrow(() ->
new IllegalArgumentException("유저 정보가 없습니다."));
notificationRepository.deleteByMemberAndChecked(member, true);
ApiResponse apiResponse = ApiResponse.builder().check(true).information(Message.builder().message("알림을 모두 삭제했습니다.").build()).build();
return ResponseEntity.ok(apiResponse);
}
@Operation(method="get", summary = "SSE 알림 기능 구독하는 API")
@CrossOrigin
@GetMapping(value = "/sub", produces=MediaType.TEXT_EVENT_STREAM_VALUE, consumes = MediaType.ALL_VALUE)
public SseEmitter subscribe(@CurrentUser UserPrincipal user) {
// 토큰에서 user의 pk값 파싱
Long userId = user.getId();
// 현재 클라이언트를 위한 SseEmitter 생성
SseEmitter sseEmitter = new SseEmitter(TIMEOUT);
try {
// 연결!!
sseEmitter.send(SseEmitter.event().name("connect"));
} catch (IOException e) {
e.printStackTrace();
}
// user의 pk값을 key값으로 해서 SseEmitter를 저장
sseEmitters.put(userId, sseEmitter);
sseEmitter.onCompletion(() -> sseEmitters.remove(userId));
sseEmitter.onTimeout(() -> sseEmitters.remove(userId));
sseEmitter.onError((e) -> sseEmitters.remove(userId));
return sseEmitter;
}
@Operation(method = "post", summary = "클라이언트 개발을 위한 notification 추가하는 API")
@PostMapping("/notification/server")
public void serverSendNotification() { // 서버용 알림 보내기
SendNotificationDto sendNotificationDto = new SendNotificationDto("전력치 이상 경고 발생", "전력치 이상이 발생하였습니다. 궁금한 점은 시스템 관리자에게 문의하십시오.");
List<Member> members = memberRepository.findAllByAuthority(Authority.MANAGER);
System.out.println("members = " + members);
for (Member member : members) {
Notification notification = notificationService.addNotification(sendNotificationDto, member);
// 알림 이벤트 발행 메서드 호출
notificationService.notifyAddEvent(notification.getId());
}
}
}
< Service >
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
import static energypa.bems.notification.controller.NotificationController.sseEmitters;
@RequiredArgsConstructor
@Service
public class NotificationService {
private final NotificationRepository notificationRepository;
private final MemberRepository memberRepository;
public void notifyAddEvent(Long notificationId) {
// 댓글에 대한 처리 후 해당 댓글이 달린 게시글의 pk값으로 게시글을 조회
Notification notification = notificationRepository.findById(notificationId).orElseThrow(
() -> new IllegalArgumentException("찾을 수 없는 알림입니다.")
);
Long userId = notification.getMember().getId();
if (sseEmitters.containsKey(userId)) {
SseEmitter sseEmitter = sseEmitters.get(userId);
try {
sseEmitter.send(SseEmitter.event().name("addNotification").data("알림이 추가되었습니다."));
} catch (Exception e) {
sseEmitters.remove(userId);
}
}
}
public void markAsRead(List<Notification> notifications) {
for (Notification notification : notifications) {
notificationRepository.updateChecked(notification.getId());
}
}
public Notification addNotification(SendNotificationDto sendNotificationDto, Member member) {
Notification notification = Notification.toEntity(sendNotificationDto, member);
notificationRepository.save(notification);
return notification;
}
public void serverSendNotification() { // 서버용 알림 보내기
SendNotificationDto sendNotificationDto = new SendNotificationDto("전력치 이상 경고 발생", "전력치 이상이 발생하였습니다. 궁금한 점은 시스템 관리자에게 문의하십시오.");
List<Member> members = memberRepository.findAllByAuthority(Authority.MANAGER);
System.out.println("members = " + members);
for (Member member : members) {
Notification notification = addNotification(sendNotificationDto, member);
// 알림 이벤트 발행 메서드 호출
notifyAddEvent(notification.getId());
}
}
}
< Repository >
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Transactional(readOnly = true)
public interface NotificationRepository extends JpaRepository<Notification, Long> {
long countByMemberAndChecked(Member member, boolean checked);
@Transactional
List<Notification> findByMemberAndCheckedOrderByCreatedDesc(Member member, boolean b);
@Transactional
void deleteByMemberAndChecked(Member member, boolean b);
@Modifying
@Transactional
@Query("update Notification p set p.checked = true where p.id = :id")
int updateChecked(@Param("id") Long id);
}
< domain >
import energypa.bems.login.domain.Member;
import energypa.bems.notification.dto.SendNotificationDto;
import jakarta.persistence.*;
import lombok.*;
import java.time.LocalDateTime;
@Entity
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Notification {
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
@Column(name = "notification_id")
private Long id;
private String title;
private String message;
private boolean checked;
@ManyToOne
private Member member;
private LocalDateTime created;
public static Notification toEntity(SendNotificationDto dto, Member member) {
return Notification.builder()
.title(dto.getTitle())
.message(dto.getMessage())
.checked(false)
.member(member)
.created(LocalDateTime.now())
.build();
}
}
< 결과 >
1. 구독기능 ( /api/sub )

: 오른 쪽 Postman을 통해 유저의 토큰을 넣고 SSE 구독을 해준 후, 왼쪽 Swagger에서 해당 유저로 알림을 보내면, Postman에서 이벤트가 발생한 것을 확인할 수 있다.
2. 새로 들어온 알림을 확인하기 ( /api/notifications/new )

3. 이미 읽은 알림을 확인하기 ( /api/notifications/old )

5~6 주차 후기
새롭게 SSE 기능을 사용해봐서 재미있게 구현할 수 있었다. SSE 기능이 배포 시에 오류와 성능 저하가 많다고 들어서, 배포할 때는 kafka를 이용해서 알림 기능을 구현해야 겠다!!
유스케이스 작성 이후에, 어느 정도 프로젝트의 뼈대가 잡힌 것 같아 수월히 진행할 수 있었다.

'프로젝트 > DC 빌딩 관리 및 에너지 예측 시스템' 카테고리의 다른 글
[9~10 주차] 프로젝트 진행 과정 (ESS 전력 분배 모니터링 서비스) (1) | 2023.11.03 |
---|---|
[7~8 주차] 프로젝트 진행 과정 (건물 관리자 신청과 등록 API 구현, 전력 고지서 API, RDS 데이터 저장 ) (0) | 2023.10.29 |
[3~4 주차] 프로젝트 진행 과정 ( 로그인 구현, 유스케이스) (0) | 2023.10.11 |
[1~2 주차] 프로젝트 진행 과정 (주제 선정, 제안서) (1) | 2023.10.03 |
댓글