핀아의 저장소 ( •̀ ω •́ )✧

01_02. 병렬처리에서 분산처리까지 본문

Big Data/Engineering

01_02. 병렬처리에서 분산처리까지

_핀아_ 2023. 5. 12. 01:25

병렬처리 (Parallel) vs 분산처리 (Distributed)

✅ Data-Parallel

RDD.map(<task>)
  1. 데이터를 여러개로 쪼개고
  2. 여러 쓰레드에서 각자 task를 적용
  3. 각자 만든 결과값을 합치는 과정

파란색이 노드, 하늘색이 데이터를 뜻한다.

✅ Distributed Data-Parallel

  1. 데이터를 여러개로 쪼개서 여러 노드로 보낸다.
  2. 여러 노드에서 각자 독립적으로 task를 적용한다.
  3. 각자 만든 결과값을 합치는 과정이다.
  • 노드간 통신같이 신경써야 될 것이 늘어난다.
  • 하지만 Spark를 이용하면 분산된 환경에서도 일반적인 병렬처리를 하듯 코드를 짜는게 가능하다.
    • Spark는 분산된 환경에서 데이터 병렬 모델을 구현해 추상화 시켜주기 때문이다.
    • 하지만 노드간 통신 속도를 신경써야 한다.

분산처리와 Latency(속도)

  • 분산처리로 넘어가면서 신경써야될 문제가 많아졌다.
    • 부분 실패 - 노드 몇개가 프로그램과 상관없는 이유로 인해 실패
    • 속도 - 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하

예시

RDD.map(A).filter(B).reduceByKey(C).take(100)
RDD.map(A).reduceByKey(C).filter(B).take(100)
  • filter와 reduceByKey 위치만 바뀐 코드이지만 성능이 다르다.
  • 첫번째 코드가 훨씬 성능이 좋다.
    • reduceByKey 때문이다. reduceByKey 함수는 여러 노드에서 데이터를 불러오기 때문에 통신을 필요로 하는데, 필터를 거쳐 데이터 양을 줄인 다음에 불러오게 되면 통신에 필요로 하는 양이 적어져 속도가 빨라지게 된다.

  • 네트워크는 메모리 연산에 비해 100만배 정도 느리다
  • 따라서 Spark의 작업이 뒷단에서 어떻게 돌아갈지 상상하면서 코딩하는 것이 중요하다.

Key-Value RDD

  • structrued data를 스파크와 연계해서 쓸 수 있게 해주는 도구 중 하나이다.
  • Key와 Value쌍을 갖는다.
  • (Key, Value) 쌍을 갖기 때문에 Pairs RDD라고도 불린다.
  • 간단한 데이터베이스처럼 다룰 수 있다.

✅ Key-Value RDD 개념

  • Single Value RDD
    • 간단하게 개수를 세거나  unstructrued data를 다룸
    • ex) 텍스트에 등장하는 단어 수 세기(날짜)
  • Key-Value RDD
    • 키를 기준으로 평균을 구하거나 통계를 구함
    • ex) 넷플릭스 드라마가 받은 평균 별점
  • Key와 Value쌍을 가진다.
    • ex) 지역 ID 별로 택시 운행 수는 어떻게 될까? -> Key(지역 ID), Value(운행 수)
    • ex) 드라마 별로 별점 수 모아보기, 평균 구하기
    • ex) 이커머스 사이트에서 상품당 별 평점 구하기
  • 코드상으로는 많이 다르지 않다.

단순 값 뿐 아니라 List도 Value가 될 수 있다.

 Key-Value RDD로 할 수 있는 것들 _ Reduction

  • reduceByKey(): 키값을 기준으로 테스크 처리, Key를 기준으로 데이터를 줄이는데 사용
  • groupByKey(): 키값을 기준으로 벨류를 묶는다.
  • sortByKey(): 키값을 기준으로 정렬
  • keys: 키값 추출
  • values(): 벨류값 추출
pairs = rdd.map(lambda x: (x, 1))
count = pairs.reduceByKey(lambda a, b: a + b)

 Key-Value RDD로 할 수 있는 것들 _ Join

  • join
  • rightOuterJoin
  • leftOuterJoin
  • subtractByKey

Mapping Values

  • Key Value 데이터에서 Key를 바꾸지 않는 경우 map()대신 value만 다루는 mapValues() 함수를 쓰도록 한다.
    • Spark 내부에서 파티션을 유지할 수 있어 더욱 효율적이다.
mapValues()

flatMapValues()
  • Value만 다루는 연산들이지만 RDD에서 key는 유지된다.

⌨ 코드

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("category-review-average")
sc = SparkContext(conf=conf)

directory = "/Users/keon/fastcampus/data-engineering/01-spark/data"
filename = "restaurant_reviews.csv"

lines = sc.textFile(f"file:///{directory}/{filename}")

lines.collect()

def parse(row):
    fields = row.split(",")
    category = fields[2]
    reviews = int(fields[3])
    return (category, reviews)
    
    
categoryReviews = filtered_lines.map(parse)

categoryReviews.collect()

categoryReviewsCount = categoryReviews.mapValues(lambda x: (x, 1))

categoryReviewsCount.collect()

reduced = categoryReviewsCount.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

reduced.collect()

averages = reduced.mapValues(lambda x: x[0] / x[1])
averages.collect()

lines = sc.textFile(f"file:///{directory}/{filename}")
lines.collect()

filtered_lines = lines.filter(lambda fow: row != header)
filtered_lines.collect()

def parse(row):
	#'0,짜장면,중식,125'
    fields = row.split(",")
    category = fields[2]
    reviews = int(fields[3])
    return (category, reviews)
    
categoryReviews = filtered_lines.map(parse)
categoryReviews.collect()

categoryReviewsCount = categoryReviews.mapValues(lambda x: (x, 1))
categoryReviewsCount.collect()

reduced = categoryReviewsCount.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reduced.collect()

# 평균구하기
averages = reduced.mapValues(lambda x: x[0] / x[1])
averages.collect()

'Big Data > Engineering' 카테고리의 다른 글

01_04. Cache & Persist  (0) 2023.05.13
01_03. RDD Transformations and Actions  (2) 2023.05.13
01_01. Spark와 RDD  (0) 2023.05.12
[Hadoop] 하둡(Hadoop) 이론  (1) 2023.05.11
00_04. 프로젝트 INTRO  (1) 2023.05.11
Comments