핀아의 저장소 ( •̀ ω •́ )✧
01_02. 병렬처리에서 분산처리까지 본문
병렬처리 (Parallel) vs 분산처리 (Distributed)
✅ Data-Parallel
RDD.map(<task>)
- 데이터를 여러개로 쪼개고
- 여러 쓰레드에서 각자 task를 적용
- 각자 만든 결과값을 합치는 과정
✅ Distributed Data-Parallel
- 데이터를 여러개로 쪼개서 여러 노드로 보낸다.
- 여러 노드에서 각자 독립적으로 task를 적용한다.
- 각자 만든 결과값을 합치는 과정이다.
- 노드간 통신같이 신경써야 될 것이 늘어난다.
- 하지만 Spark를 이용하면 분산된 환경에서도 일반적인 병렬처리를 하듯 코드를 짜는게 가능하다.
- Spark는 분산된 환경에서 데이터 병렬 모델을 구현해 추상화 시켜주기 때문이다.
- 하지만 노드간 통신 속도를 신경써야 한다.
분산처리와 Latency(속도)
- 분산처리로 넘어가면서 신경써야될 문제가 많아졌다.
- 부분 실패 - 노드 몇개가 프로그램과 상관없는 이유로 인해 실패
- 속도 - 많은 네트워크 통신을 필요로 하는 작업은 속도가 저하
예시
RDD.map(A).filter(B).reduceByKey(C).take(100)
RDD.map(A).reduceByKey(C).filter(B).take(100)
- filter와 reduceByKey 위치만 바뀐 코드이지만 성능이 다르다.
- 첫번째 코드가 훨씬 성능이 좋다.
- reduceByKey 때문이다. reduceByKey 함수는 여러 노드에서 데이터를 불러오기 때문에 통신을 필요로 하는데, 필터를 거쳐 데이터 양을 줄인 다음에 불러오게 되면 통신에 필요로 하는 양이 적어져 속도가 빨라지게 된다.
- 네트워크는 메모리 연산에 비해 100만배 정도 느리다
- 따라서 Spark의 작업이 뒷단에서 어떻게 돌아갈지 상상하면서 코딩하는 것이 중요하다.
Key-Value RDD
- structrued data를 스파크와 연계해서 쓸 수 있게 해주는 도구 중 하나이다.
- Key와 Value쌍을 갖는다.
- (Key, Value) 쌍을 갖기 때문에 Pairs RDD라고도 불린다.
- 간단한 데이터베이스처럼 다룰 수 있다.
✅ Key-Value RDD 개념
- Single Value RDD
- 간단하게 개수를 세거나 unstructrued data를 다룸
- ex) 텍스트에 등장하는 단어 수 세기(날짜)
- Key-Value RDD
- 키를 기준으로 평균을 구하거나 통계를 구함
- ex) 넷플릭스 드라마가 받은 평균 별점
- Key와 Value쌍을 가진다.
- ex) 지역 ID 별로 택시 운행 수는 어떻게 될까? -> Key(지역 ID), Value(운행 수)
- ex) 드라마 별로 별점 수 모아보기, 평균 구하기
- ex) 이커머스 사이트에서 상품당 별 평점 구하기
- 코드상으로는 많이 다르지 않다.
✅ Key-Value RDD로 할 수 있는 것들 _ Reduction
- reduceByKey(): 키값을 기준으로 테스크 처리, Key를 기준으로 데이터를 줄이는데 사용
- groupByKey(): 키값을 기준으로 벨류를 묶는다.
- sortByKey(): 키값을 기준으로 정렬
- keys: 키값 추출
- values(): 벨류값 추출
pairs = rdd.map(lambda x: (x, 1))
count = pairs.reduceByKey(lambda a, b: a + b)
✅ Key-Value RDD로 할 수 있는 것들 _ Join
- join
- rightOuterJoin
- leftOuterJoin
- subtractByKey
✅ Mapping Values
- Key Value 데이터에서 Key를 바꾸지 않는 경우 map()대신 value만 다루는 mapValues() 함수를 쓰도록 한다.
- Spark 내부에서 파티션을 유지할 수 있어 더욱 효율적이다.
mapValues()
flatMapValues()
- Value만 다루는 연산들이지만 RDD에서 key는 유지된다.
⌨ 코드
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("category-review-average")
sc = SparkContext(conf=conf)
directory = "/Users/keon/fastcampus/data-engineering/01-spark/data"
filename = "restaurant_reviews.csv"
lines = sc.textFile(f"file:///{directory}/{filename}")
lines.collect()
def parse(row):
fields = row.split(",")
category = fields[2]
reviews = int(fields[3])
return (category, reviews)
categoryReviews = filtered_lines.map(parse)
categoryReviews.collect()
categoryReviewsCount = categoryReviews.mapValues(lambda x: (x, 1))
categoryReviewsCount.collect()
reduced = categoryReviewsCount.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reduced.collect()
averages = reduced.mapValues(lambda x: x[0] / x[1])
averages.collect()
lines = sc.textFile(f"file:///{directory}/{filename}")
lines.collect()
filtered_lines = lines.filter(lambda fow: row != header)
filtered_lines.collect()
def parse(row):
#'0,짜장면,중식,125'
fields = row.split(",")
category = fields[2]
reviews = int(fields[3])
return (category, reviews)
categoryReviews = filtered_lines.map(parse)
categoryReviews.collect()
categoryReviewsCount = categoryReviews.mapValues(lambda x: (x, 1))
categoryReviewsCount.collect()
reduced = categoryReviewsCount.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
reduced.collect()
# 평균구하기
averages = reduced.mapValues(lambda x: x[0] / x[1])
averages.collect()
'Big Data > Engineering' 카테고리의 다른 글
01_04. Cache & Persist (0) | 2023.05.13 |
---|---|
01_03. RDD Transformations and Actions (2) | 2023.05.13 |
01_01. Spark와 RDD (0) | 2023.05.12 |
[Hadoop] 하둡(Hadoop) 이론 (1) | 2023.05.11 |
00_04. 프로젝트 INTRO (1) | 2023.05.11 |
Comments