ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 빅데이터 수집
    빅데이터 2020. 6. 25. 18:36

    3. 빅데이터 수집

    • 빅데이터 수집 개요

    • 빅데이터 수집에 활용되는 기술

    • 수집 파일럿 실행 1단계 - 수집 아키텍처

    • 수집 파일럿 실행 2단계 - 수집 환경 구성

    • 수집 파일럿 실행 3단계 - 플럼 수집 기능 구현

    • 수집 파일럿 실행 4단계 - 카프카 수집 기능 구현

    • 수집 파일럿 실행 5단계 - 수집 기능 테스트

    빅데이터 수집 개요

    • 빅데이터 시스템 구축은 수집부터 시작

    • 수집이 전체 공정의 절반 이상 차지

    수집 범위 P106 그림 3-2 참고

    • 조직 전체 시스템

    • 외부 시스템(SNS, 포털, 정부기관 등)

    빅데이터 프로세싱

    과거

    • 수집 / 적재 후 맵리듀스 기반의 주기적인 배치성 분석

    현재

    • 수집과 동시에 분석

    • ESP(Event Stream Proccessing)

    • 파일럿 : 운저자 상태 정보 실시간 수집하고 다양한 운행패턴 이벤트 감지 기능 구현

    빅데이터 수집에 활용할 기술

    플럼(Flume)

    • http://flume.apache.org

    • 빅데이터 수집시 다양한 수집 요구 사항 해결

    • Source => Interceptor => Channel => Sink => 적재

    • Source : 다양한 원천 시스템 데이터를 수집하기 위한 컴포넌트 제공, 데이터로드

    • Interceptor: Source 와 Channel사이 데이터 필터링

    • Channel: 데이터 임시 저장, 데이터를 버퍼링 하는 컴포넌트로 메모리, 파일, 데이터베이스를 저장소로 활용

    • Sink : 최종 목적지(HDFS, Hive,..)에 저장하는 기능

    플럼 구성

    스마트카 상태 정보 CarLogMain 대용량

    • 3초간격, 100M/day(대용량) : 플럼 => 하둡

    스마트카 운전자 운행 정보DriverLogMain 실시간

    • 실시간, 400kb/1초 : 플럼 => 카프카

    카프카(Kafka) - MOM(Message Oriented Middleware)

    • 대규모 메세지성 데이터를 비동기 방식으로 중계하는 역할

    • 실시간 빠르게 발생하는 데이터 수집 시 이를 최종 목적지에 전달하기 전 중간에서 안정적인 버퍼링 처리 필요

    • 중간 저장소가 완충 역할을 함으로써 안정적인 수집 아키텍처 구성

    • 대규모 트랜잭션 => 중간버퍼링(카프카) => 목적지

    • 주키퍼와 연동 필요

    • Broker

      • 카프카 서비스의 인스터스, 다수의 Borker 클러스터로 구성

      • Topic 이 생성되는 물리적 서버

    • Topic : Broker에서 데이터 발생/소비 처리를 위한 저장소

    • Producer : Borker 특정 Topic 에 전송하는 역할

    • Consumer: Borker 특정 Topic 에 수신하는 역할

     

    플럼 에이전트 1

    • 스마트카 상태 정보를 기록한 로그 파일을 일별로 수집하기 위한 배치성 플럼 에이전트

    • SpoolDir Source : 약속된 로그 발생 디렉토리를 모니터링하다가 정의된 로그 파일 발생 시 해당 파일의 내용을 읽어서 수집하는 기능 제공

    • Memory Channel: ource로 부터 수집된 데이터를 메모리 Chennel에 중간 적재

    • Logger Sink: Channel 로부터 읽어 들인 데이터를 플럼 표준 로그 파일로 출력

     

    플럼 에이전트 2

    • 스마트카 운전자의 운행 정보를 실시간으로 수집하기 위한 실시간성 플럼 에이전트

    • Exec-Tail Source: 로그가 쌓이고 있는 파일에 Tail 파이프라인을 이용해 실시간으로 데이터를 수집하는 기능

    • Memory Channel: ource로 부터 수집된 데이터를 메모리 Chennel에 중간 적재

    • Kafka Sink: Channel 로부터 읽어 들인 데이터를 Kafka Borker의 특적 Topic에 비동기 전송

     

    기타

    • Flume Stdout: 플럼의 Logger-Sink를 통해 표준 출력 로그가 출력됨

    • Kafka Topic: 플럼의 Kafka-Sink는 수집된 실시간 로그를 임시 적재

    수집 파일럿 실행 2단계 - 수집 환경 구성

     

    플럼 설치

    • Cluster1 > 선택 메뉴 > 서비스 추가

    • 플럼 선택 > 호스트 선택 > Server02

     

    플럼 설치후

    • 설치 후 Heap 메모리 변경

    • Flume > 구성 > java heap 검색

    • 50 > 100

    • 플럼 > 선택 메뉴

     

    카프카 설치

    • Cluster1 > 선택 메뉴 > 서비스 추가

    • Kafka 선택 > Kafka Broker > 호스트 선택 > Server02

    • 변경 내용 검토 > 기본값 선택 후 계속

     

    설치 중 Failde to start service 오류 발생시

    • Heap 메모리 변경

    • 새로운 브라우저에 CM 접속

    • Kafka > 구성 > java heap 검색

    • 50 > 256

    • 설치페이지 > 재시도

    수집 파일럿 실행 3단계 - 플럼 수집 기능 구현

    • 2개의 에이전트 구현

     

    SmartCar 에이전트 생성

    • CM > Flume > 구성 > 구성 파일 부분 찾기

    • CM 에서 제공하는 플럼 기본 에이전트

    • Agent 이름 : tier1 => SmartCar_Agent

    • CH03-ex-1

     

    플럼에이전트에서 사용할 Source, Channel, Sink 변수 정의

    SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
    SmartCar_Agent.channels = SmartCarInfo_Channel
    SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

     

    spooldir은 특정 디렉터리를 모니터링 Source.type = spooldir

    • 새로은 파일이 생성되면 이벤트를 감지해 batchSize 값 만큼 읽어서 Channel로 전송

    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

    중간 적재할 채널 종류 설정 Channel.type = memory

    • 성능은 memory, 안정성은 file

    SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
    SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
    SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000

    에이전트테스트 및 디버깅용 최종 목적지 Sink.type = logger

    • 플럼 표준 출력 로그 파일

    • /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

    SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

    interceptors 추가

    • Source와 Channel 중간에서 데이터 가공하는 역할

    • 플럼 Source에서 유입되는 데이터 중 일부를 수정/추가/가공/정제 등

    • 플럼 데이터 전송 단위 : Event = Header + Body

    • interceptors는 Header 특정값 추가, Body 데이터 가공

    • interceptors = filterInterceptor : 변수 선언

    • type = regex_filter: 정규 표현식을 이용해서 필터링

    • regex = ^\d{14} : 14자리 날짜 형식으로 시작하는 데이터

    • excludeEvents = false : true 이면 반대로 제외된 값 수집

    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor
    
    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false
    

    Source와 Channel Sink 연결

    SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
    SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

    DriverCarInfo 에이전트 생성

     

    변수 추가

    SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
    SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
    SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

    Source.type = exec

    • 외부 수행 명령 결과를 플럼 Event로 가져와 수집

    • tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

    SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
    SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
    SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
    SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

    interceptors 추가

    SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2
    
    SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
    SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
    SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

    ink.type = org.apache.flume.sink.kafka.KafkaSink

    • 카프카(브로커) 설치 서버 : server02.hadoop.com:9092

    SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
    SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
    SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
    SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
    SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000

    Channel.type = memory

    SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
    SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
    SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000

    Source와 Channel Sink 연결

    SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
    SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel

    수집 파일럿 실행 4단계 - 카프카 기능 구현

    • 카프카 명령어를 이용 카프카 Broker 안에서 사용할 Topic 생성

    • Producer 명령어를 통해 데이터 전송

    • Consumer 명령어로 수신

     

    카프카 Topic 생성

    • Server02 SSH 접속

    • kafka-topics --create --zookeeper server02.hadoop.com:2181 --replication-factor 1 --partitions 1 --topic SmartCar-Topic

    • 결과: Created topic SmartCar-Topic

    • 이미 만들어져 있는 경우

    • kafka-topics --create --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic 입력해서 삭제

    • replication-factor 1: 다중 복제 수

    • partitions 1: 분산 저장 수

     

    카프카 Topic 생성

    • kafka-topics --delete --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic

    • 결과: Topic SmartCar-Topic is marked for deletion.

     

    카프카 Producer 사용

    • kafka-console-producer --broker-list server02.hadoop.com:9092 --topic SmartCar-Topic

     

    카프카 Consumer 사용

    • 새로운 Server02 SSH 접속

    • kafka-console-consumer --zookeeper server02.hadoop.com:2181 --topic SmartCar-Topic --from-beggining => zookeeper is not a recognized option <오류>

    • kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic

     

    카프카 Producer, consumer 사용

    • 송수신 테스트

    • 수신 안될시 카프카 > 구성 > offsets.topic.replication.factor = 1로 변경

    수집 파일럿 실행 5단계 - 수집 기능 테스트

    • Sever02 SSH 접속 시뮬레이터 위치로 이동

    • cd /home/pilot-pjt/working

    • 3대 스마트카 상태 정보 수집

     

    운전정보 시뮬레이터 File 실행 DriverLogMain 날짜 차량수 &(backg)

    • java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.DriverLogMain 20190101 3 &

     

    Car정보 시뮬레이터 File 실행 CarLogMain 날짜 차량수 &(backg)

    • java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.CarLogMain 20190101 3 &

     

    시뮬레이터 정상 작동 확인

    • ls -l /home/pilot-pjt/working/SmartCar/

    • SmartCarStatusInfo_20190101.txt 생성 확인

    • ls -l /home/pilot-pjt/working/driver-realtime-log/

    • SmartCarDriverInfo.log 생성 확인

    • 실시간 확인 : tail -f /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

    • 로그파일 Spooldir 경로로 이동

    • mv /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20190101.txt /home/pilot-pjt/working/car-batch-log/

    수집 기능 점검

    스마트카 상태 정보 로그파일 => 표준 출력로그 tail 로 확인

    • tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log

    • Event, Header, Body

     

    실시간 운전정보 DriverCarInfo 수집 확인

    • kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic

     

    시뮬레이터 종료

    • ps -ef |grep smartcar.log

    • kill -9 pid

     

    Tip _ 파일럿 환경 로그 확인

    • Hadoop 에코시스템 서버들 위치 : /var/log/각서버(cloudera,Hadoop, Oozie)등

    • Redis: /var/log/redis_6379.log

    • Storm : /home/pilot-pjt/storm/logs/

    • Zeppelin: /home/pilot-pjt/zeppelin-x.x.x-bin-all/logs

    '빅데이터' 카테고리의 다른 글

    빅데이터 분석  (0) 2020.07.01
    빅데이터 탐색  (0) 2020.07.01
    빅데이터 적재 - 실시간 로그 파일 적재  (0) 2020.06.26
    빅데이터 적재 - 대용량 로그 파일 적재  (0) 2020.06.25
    빅데이터 파일럿 프로젝트  (0) 2020.06.25
Designed by Tistory.