빅데이터

빅데이터 수집

2020. 6. 25. 18:36

3. 빅데이터 수집

  • 빅데이터 수집 개요

  • 빅데이터 수집에 활용되는 기술

  • 수집 파일럿 실행 1단계 - 수집 아키텍처

  • 수집 파일럿 실행 2단계 - 수집 환경 구성

  • 수집 파일럿 실행 3단계 - 플럼 수집 기능 구현

  • 수집 파일럿 실행 4단계 - 카프카 수집 기능 구현

  • 수집 파일럿 실행 5단계 - 수집 기능 테스트

빅데이터 수집 개요

  • 빅데이터 시스템 구축은 수집부터 시작

  • 수집이 전체 공정의 절반 이상 차지

수집 범위 P106 그림 3-2 참고

  • 조직 전체 시스템

  • 외부 시스템(SNS, 포털, 정부기관 등)

빅데이터 프로세싱

과거

  • 수집 / 적재 후 맵리듀스 기반의 주기적인 배치성 분석

현재

  • 수집과 동시에 분석

  • ESP(Event Stream Proccessing)

  • 파일럿 : 운저자 상태 정보 실시간 수집하고 다양한 운행패턴 이벤트 감지 기능 구현

빅데이터 수집에 활용할 기술

플럼(Flume)

  • http://flume.apache.org

  • 빅데이터 수집시 다양한 수집 요구 사항 해결

  • Source => Interceptor => Channel => Sink => 적재

  • Source : 다양한 원천 시스템 데이터를 수집하기 위한 컴포넌트 제공, 데이터로드

  • Interceptor: Source 와 Channel사이 데이터 필터링

  • Channel: 데이터 임시 저장, 데이터를 버퍼링 하는 컴포넌트로 메모리, 파일, 데이터베이스를 저장소로 활용

  • Sink : 최종 목적지(HDFS, Hive,..)에 저장하는 기능

플럼 구성

스마트카 상태 정보 CarLogMain 대용량

  • 3초간격, 100M/day(대용량) : 플럼 => 하둡

스마트카 운전자 운행 정보DriverLogMain 실시간

  • 실시간, 400kb/1초 : 플럼 => 카프카

카프카(Kafka) - MOM(Message Oriented Middleware)

  • 대규모 메세지성 데이터를 비동기 방식으로 중계하는 역할

  • 실시간 빠르게 발생하는 데이터 수집 시 이를 최종 목적지에 전달하기 전 중간에서 안정적인 버퍼링 처리 필요

  • 중간 저장소가 완충 역할을 함으로써 안정적인 수집 아키텍처 구성

  • 대규모 트랜잭션 => 중간버퍼링(카프카) => 목적지

  • 주키퍼와 연동 필요

  • Broker

    • 카프카 서비스의 인스터스, 다수의 Borker 클러스터로 구성

    • Topic 이 생성되는 물리적 서버

  • Topic : Broker에서 데이터 발생/소비 처리를 위한 저장소

  • Producer : Borker 특정 Topic 에 전송하는 역할

  • Consumer: Borker 특정 Topic 에 수신하는 역할

 

플럼 에이전트 1

  • 스마트카 상태 정보를 기록한 로그 파일을 일별로 수집하기 위한 배치성 플럼 에이전트

  • SpoolDir Source : 약속된 로그 발생 디렉토리를 모니터링하다가 정의된 로그 파일 발생 시 해당 파일의 내용을 읽어서 수집하는 기능 제공

  • Memory Channel: ource로 부터 수집된 데이터를 메모리 Chennel에 중간 적재

  • Logger Sink: Channel 로부터 읽어 들인 데이터를 플럼 표준 로그 파일로 출력

 

플럼 에이전트 2

  • 스마트카 운전자의 운행 정보를 실시간으로 수집하기 위한 실시간성 플럼 에이전트

  • Exec-Tail Source: 로그가 쌓이고 있는 파일에 Tail 파이프라인을 이용해 실시간으로 데이터를 수집하는 기능

  • Memory Channel: ource로 부터 수집된 데이터를 메모리 Chennel에 중간 적재

  • Kafka Sink: Channel 로부터 읽어 들인 데이터를 Kafka Borker의 특적 Topic에 비동기 전송

 

기타

  • Flume Stdout: 플럼의 Logger-Sink를 통해 표준 출력 로그가 출력됨

  • Kafka Topic: 플럼의 Kafka-Sink는 수집된 실시간 로그를 임시 적재

수집 파일럿 실행 2단계 - 수집 환경 구성

 

플럼 설치

  • Cluster1 > 선택 메뉴 > 서비스 추가

  • 플럼 선택 > 호스트 선택 > Server02

 

플럼 설치후

  • 설치 후 Heap 메모리 변경

  • Flume > 구성 > java heap 검색

  • 50 > 100

  • 플럼 > 선택 메뉴

 

카프카 설치

  • Cluster1 > 선택 메뉴 > 서비스 추가

  • Kafka 선택 > Kafka Broker > 호스트 선택 > Server02

  • 변경 내용 검토 > 기본값 선택 후 계속

 

설치 중 Failde to start service 오류 발생시

  • Heap 메모리 변경

  • 새로운 브라우저에 CM 접속

  • Kafka > 구성 > java heap 검색

  • 50 > 256

  • 설치페이지 > 재시도

수집 파일럿 실행 3단계 - 플럼 수집 기능 구현

  • 2개의 에이전트 구현

 

SmartCar 에이전트 생성

  • CM > Flume > 구성 > 구성 파일 부분 찾기

  • CM 에서 제공하는 플럼 기본 에이전트

  • Agent 이름 : tier1 => SmartCar_Agent

  • CH03-ex-1

 

플럼에이전트에서 사용할 Source, Channel, Sink 변수 정의

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

 

spooldir은 특정 디렉터리를 모니터링 Source.type = spooldir

  • 새로은 파일이 생성되면 이벤트를 감지해 batchSize 값 만큼 읽어서 Channel로 전송

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

중간 적재할 채널 종류 설정 Channel.type = memory

  • 성능은 memory, 안정성은 file

SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000

에이전트테스트 및 디버깅용 최종 목적지 Sink.type = logger

  • 플럼 표준 출력 로그 파일

  • /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

interceptors 추가

  • Source와 Channel 중간에서 데이터 가공하는 역할

  • 플럼 Source에서 유입되는 데이터 중 일부를 수정/추가/가공/정제 등

  • 플럼 데이터 전송 단위 : Event = Header + Body

  • interceptors는 Header 특정값 추가, Body 데이터 가공

  • interceptors = filterInterceptor : 변수 선언

  • type = regex_filter: 정규 표현식을 이용해서 필터링

  • regex = ^\d{14} : 14자리 날짜 형식으로 시작하는 데이터

  • excludeEvents = false : true 이면 반대로 제외된 값 수집

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false

Source와 Channel Sink 연결

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

DriverCarInfo 에이전트 생성

 

변수 추가

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

Source.type = exec

  • 외부 수행 명령 결과를 플럼 Event로 가져와 수집

  • tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

interceptors 추가

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

ink.type = org.apache.flume.sink.kafka.KafkaSink

  • 카프카(브로커) 설치 서버 : server02.hadoop.com:9092

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000

Channel.type = memory

SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000

Source와 Channel Sink 연결

SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel

수집 파일럿 실행 4단계 - 카프카 기능 구현

  • 카프카 명령어를 이용 카프카 Broker 안에서 사용할 Topic 생성

  • Producer 명령어를 통해 데이터 전송

  • Consumer 명령어로 수신

 

카프카 Topic 생성

  • Server02 SSH 접속

  • kafka-topics --create --zookeeper server02.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic SmartCar-Topic

  • 결과: Created topic SmartCar-Topic

  • 이미 만들어져 있는 경우

  • kafka-topics --create --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic 입력해서 삭제

  • replication-factor 1: 다중 복제 수

  • partitions 1: 분산 저장 수

 

카프카 Topic 생성

  • kafka-topics --delete --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic

  • 결과: Topic SmartCar-Topic is marked for deletion.

 

카프카 Producer 사용

  • kafka-console-producer --broker-list server02.hadoop.com:9092 --topic SmartCar-Topic

 

카프카 Consumer 사용

  • 새로운 Server02 SSH 접속

  • kafka-console-consumer --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic --from-beggining => zookeeper is not a recognized option <오류>

  • kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic

 

카프카 Producer, consumer 사용

  • 송수신 테스트

  • 수신 안될시 카프카 > 구성 > offsets.topic.replication.factor = 1로 변경

수집 파일럿 실행 5단계 - 수집 기능 테스트

  • Sever02 SSH 접속 시뮬레이터 위치로 이동

  • cd /home/pilot-pjt/working

  • 3대 스마트카 상태 정보 수집

 

운전정보 시뮬레이터 File 실행 DriverLogMain 날짜 차량수 &(backg)

  • java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.DriverLogMain 20190101 3 &

 

Car정보 시뮬레이터 File 실행 CarLogMain 날짜 차량수 &(backg)

  • java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.CarLogMain 20190101 3 &

 

시뮬레이터 정상 작동 확인

  • ls -l /home/pilot-pjt/working/SmartCar/

  • SmartCarStatusInfo_20190101.txt 생성 확인

  • ls -l /home/pilot-pjt/working/driver-realtime-log/

  • SmartCarDriverInfo.log 생성 확인

  • 실시간 확인 : tail -f /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

  • 로그파일 Spooldir 경로로 이동

  • mv /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20190101.txt /home/pilot-pjt/working/car-batch-log/

수집 기능 점검

스마트카 상태 정보 로그파일 => 표준 출력로그 tail 로 확인

  • tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

  • Event, Header, Body

 

실시간 운전정보 DriverCarInfo 수집 확인

  • kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic

 

시뮬레이터 종료

  • ps -ef |grep smartcar.log

  • kill -9 pid

 

Tip _ 파일럿 환경 로그 확인

  • Hadoop 에코시스템 서버들 위치 : /var/log/각서버(cloudera,Hadoop, Oozie)등

  • Redis: /var/log/redis_6379.log

  • Storm : /home/pilot-pjt/storm/logs/

  • Zeppelin: /home/pilot-pjt/zeppelin-x.x.x-bin-all/logs