-
3. 빅데이터 수집
-
빅데이터 수집 개요
-
빅데이터 수집에 활용되는 기술
-
수집 파일럿 실행 1단계 - 수집 아키텍처
-
수집 파일럿 실행 2단계 - 수집 환경 구성
-
수집 파일럿 실행 3단계 - 플럼 수집 기능 구현
-
수집 파일럿 실행 4단계 - 카프카 수집 기능 구현
-
수집 파일럿 실행 5단계 - 수집 기능 테스트
빅데이터 수집 개요
-
빅데이터 시스템 구축은 수집부터 시작
-
수집이 전체 공정의 절반 이상 차지
수집 범위 P106 그림 3-2 참고
-
조직 전체 시스템
-
외부 시스템(SNS, 포털, 정부기관 등)
빅데이터 프로세싱
과거
-
수집 / 적재 후 맵리듀스 기반의 주기적인 배치성 분석
현재
-
수집과 동시에 분석
-
ESP(Event Stream Proccessing)
-
파일럿 : 운저자 상태 정보 실시간 수집하고 다양한 운행패턴 이벤트 감지 기능 구현
빅데이터 수집에 활용할 기술
플럼(Flume)
-
빅데이터 수집시 다양한 수집 요구 사항 해결
-
Source => Interceptor => Channel => Sink => 적재
-
Source : 다양한 원천 시스템 데이터를 수집하기 위한 컴포넌트 제공, 데이터로드
-
Interceptor: Source 와 Channel사이 데이터 필터링
-
Channel: 데이터 임시 저장, 데이터를 버퍼링 하는 컴포넌트로 메모리, 파일, 데이터베이스를 저장소로 활용
-
Sink : 최종 목적지(HDFS, Hive,..)에 저장하는 기능
플럼 구성
스마트카 상태 정보 CarLogMain 대용량
-
3초간격, 100M/day(대용량) : 플럼 => 하둡
스마트카 운전자 운행 정보DriverLogMain 실시간
-
실시간, 400kb/1초 : 플럼 => 카프카
카프카(Kafka) - MOM(Message Oriented Middleware)
-
대규모 메세지성 데이터를 비동기 방식으로 중계하는 역할
-
실시간 빠르게 발생하는 데이터 수집 시 이를 최종 목적지에 전달하기 전 중간에서 안정적인 버퍼링 처리 필요
-
중간 저장소가 완충 역할을 함으로써 안정적인 수집 아키텍처 구성
-
대규모 트랜잭션 => 중간버퍼링(카프카) => 목적지
-
주키퍼와 연동 필요
-
Broker
-
카프카 서비스의 인스터스, 다수의 Borker 클러스터로 구성
-
Topic 이 생성되는 물리적 서버
-
-
Topic : Broker에서 데이터 발생/소비 처리를 위한 저장소
-
Producer : Borker 특정 Topic 에 전송하는 역할
-
Consumer: Borker 특정 Topic 에 수신하는 역할
플럼 에이전트 1
-
스마트카 상태 정보를 기록한 로그 파일을 일별로 수집하기 위한 배치성 플럼 에이전트
-
SpoolDir Source : 약속된 로그 발생 디렉토리를 모니터링하다가 정의된 로그 파일 발생 시 해당 파일의 내용을 읽어서 수집하는 기능 제공
-
Memory Channel: ource로 부터 수집된 데이터를 메모리 Chennel에 중간 적재
-
Logger Sink: Channel 로부터 읽어 들인 데이터를 플럼 표준 로그 파일로 출력
플럼 에이전트 2
-
스마트카 운전자의 운행 정보를 실시간으로 수집하기 위한 실시간성 플럼 에이전트
-
Exec-Tail Source: 로그가 쌓이고 있는 파일에 Tail 파이프라인을 이용해 실시간으로 데이터를 수집하는 기능
-
Memory Channel: ource로 부터 수집된 데이터를 메모리 Chennel에 중간 적재
-
Kafka Sink: Channel 로부터 읽어 들인 데이터를 Kafka Borker의 특적 Topic에 비동기 전송
기타
-
Flume Stdout: 플럼의 Logger-Sink를 통해 표준 출력 로그가 출력됨
-
Kafka Topic: 플럼의 Kafka-Sink는 수집된 실시간 로그를 임시 적재
수집 파일럿 실행 2단계 - 수집 환경 구성
-
CM을 이용해 플럼, 카프가 설치 : Server02
플럼 설치
-
Cluster1 > 선택 메뉴 > 서비스 추가
-
플럼 선택 > 호스트 선택 > Server02
플럼 설치후
-
설치 후 Heap 메모리 변경
-
Flume > 구성 > java heap 검색
-
50 > 100
-
플럼 > 선택 메뉴
카프카 설치
-
Cluster1 > 선택 메뉴 > 서비스 추가
-
Kafka 선택 > Kafka Broker > 호스트 선택 > Server02
-
변경 내용 검토 > 기본값 선택 후 계속
설치 중 Failde to start service 오류 발생시
-
Heap 메모리 변경
-
새로운 브라우저에 CM 접속
-
Kafka > 구성 > java heap 검색
-
50 > 256
-
설치페이지 > 재시도
수집 파일럿 실행 3단계 - 플럼 수집 기능 구현
-
2개의 에이전트 구현
SmartCar 에이전트 생성
-
CM > Flume > 구성 > 구성 파일 부분 찾기
-
CM 에서 제공하는 플럼 기본 에이전트
-
Agent 이름 : tier1 => SmartCar_Agent
-
CH03-ex-1
플럼에이전트에서 사용할 Source, Channel, Sink 변수 정의
SmartCar_Agent.sources = SmartCarInfo_SpoolSource SmartCar_Agent.channels = SmartCarInfo_Channel SmartCar_Agent.sinks = SmartCarInfo_LoggerSink
spooldir은 특정 디렉터리를 모니터링 Source.type = spooldir
-
새로은 파일이 생성되면 이벤트를 감지해 batchSize 값 만큼 읽어서 Channel로 전송
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000
중간 적재할 채널 종류 설정 Channel.type = memory
-
성능은 memory, 안정성은 file
SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory SmartCar_Agent.channels.SmartCarInfo_Channel.capacity = 100000 SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity = 10000
에이전트테스트 및 디버깅용 최종 목적지 Sink.type = logger
-
플럼 표준 출력 로그 파일
-
/var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger
interceptors 추가
-
Source와 Channel 중간에서 데이터 가공하는 역할
-
플럼 Source에서 유입되는 데이터 중 일부를 수정/추가/가공/정제 등
-
플럼 데이터 전송 단위 : Event = Header + Body
-
interceptors는 Header 특정값 추가, Body 데이터 가공
-
interceptors = filterInterceptor : 변수 선언
-
type = regex_filter: 정규 표현식을 이용해서 필터링
-
regex = ^\d{14} : 14자리 날짜 형식으로 시작하는 데이터
-
excludeEvents = false : true 이면 반대로 제외된 값 수집
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14} SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false
Source와 Channel Sink 연결
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel
DriverCarInfo 에이전트 생성
변수 추가
SmartCar_Agent.sources = SmartCarInfo_SpoolSource DriverCarInfo_TailSource SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel SmartCar_Agent.sinks = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink
Source.type = exec
-
외부 수행 명령 결과를 플럼 Event로 가져와 수집
-
tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000
interceptors 추가
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2 SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14} SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false
ink.type = org.apache.flume.sink.kafka.KafkaSink
-
카프카(브로커) 설치 서버 : server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092 SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1 SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000
Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000 SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000
Source와 Channel Sink 연결
SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel
수집 파일럿 실행 4단계 - 카프카 기능 구현
-
카프카 명령어를 이용 카프카 Broker 안에서 사용할 Topic 생성
-
Producer 명령어를 통해 데이터 전송
-
Consumer 명령어로 수신
카프카 Topic 생성
-
Server02 SSH 접속
-
kafka-topics --create --zookeeper server02.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic SmartCar-Topic
-
결과: Created topic SmartCar-Topic
-
이미 만들어져 있는 경우
-
kafka-topics --create --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic 입력해서 삭제
-
replication-factor 1: 다중 복제 수
-
partitions 1: 분산 저장 수
카프카 Topic 생성
-
kafka-topics --delete --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic
-
결과: Topic SmartCar-Topic is marked for deletion.
카프카 Producer 사용
-
kafka-console-producer --broker-list server02.hadoop.com:9092 --topic SmartCar-Topic
카프카 Consumer 사용
-
새로운 Server02 SSH 접속
-
kafka-console-consumer --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic --from-beggining => zookeeper is not a recognized option <오류>
-
kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic
카프카 Producer, consumer 사용
-
송수신 테스트
-
수신 안될시 카프카 > 구성 > offsets.topic.replication.factor = 1로 변경
수집 파일럿 실행 5단계 - 수집 기능 테스트
-
Sever02 SSH 접속 시뮬레이터 위치로 이동
-
cd /home/pilot-pjt/working
-
3대 스마트카 상태 정보 수집
운전정보 시뮬레이터 File 실행 DriverLogMain 날짜 차량수 &(backg)
-
java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.DriverLogMain 20190101 3 &
Car정보 시뮬레이터 File 실행 CarLogMain 날짜 차량수 &(backg)
-
java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.CarLogMain 20190101 3 &
시뮬레이터 정상 작동 확인
-
ls -l /home/pilot-pjt/working/SmartCar/
-
SmartCarStatusInfo_20190101.txt 생성 확인
-
ls -l /home/pilot-pjt/working/driver-realtime-log/
-
SmartCarDriverInfo.log 생성 확인
-
실시간 확인 : tail -f /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
-
로그파일 Spooldir 경로로 이동
-
mv /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20190101.txt /home/pilot-pjt/working/car-batch-log/
수집 기능 점검
스마트카 상태 정보 로그파일 => 표준 출력로그 tail 로 확인
-
tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log
-
Event, Header, Body
실시간 운전정보 DriverCarInfo 수집 확인
-
kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic
시뮬레이터 종료
-
ps -ef |grep smartcar.log
-
kill -9 pid
Tip _ 파일럿 환경 로그 확인
-
Hadoop 에코시스템 서버들 위치 : /var/log/각서버(cloudera,Hadoop, Oozie)등
-
Redis: /var/log/redis_6379.log
-
Storm : /home/pilot-pjt/storm/logs/
-
Zeppelin: /home/pilot-pjt/zeppelin-x.x.x-bin-all/logs
'빅데이터' 카테고리의 다른 글
빅데이터 분석 (0) 2020.07.01 빅데이터 탐색 (0) 2020.07.01 빅데이터 적재 - 실시간 로그 파일 적재 (0) 2020.06.26 빅데이터 적재 - 대용량 로그 파일 적재 (0) 2020.06.25 빅데이터 파일럿 프로젝트 (0) 2020.06.25 -