핀아의 저장소 ( •̀ ω •́ )✧
01_03. RDD Transformations and Actions 본문
Transformations & Actions
- Transformations
- 결과값으로 새로운 RDD를 반환
- Actions가 실행되기 전까진 실행되지 않는다.
- 지연 실행(Lazy Execution)
- Actions
- 결과값을 연산하여 출력하거나 저장
- 파이썬 오브젝트나 리스트를 반환한다.
- 즉시 실행(Eager Execution)
✅ Transformations
- map()
- flatMap()
- filter()
- distinct()
- reduceByKey()
- groupByKey()
- mapValues()
- flatMapValues()
- sortByKey()
◾ Narrow Transformation
- 1:1 변환
- filter(), map(), flatMap(), sample(), union()
- 1열을 조작하기 위해 다른 열/파티션의 데이터를 쓸 필요가 없다.
- 정렬이 필요하지 않은 경우
- 정렬 같은 경우 새로운 RDD를 만들기 위해서 한 열을 처리를 할때 다른 열을 처리하여 비교해야 하는데 이때 많은 통신이 이루어진다.
- 따라서, 이런 비교가 필요하지않은 경우가 일대일 변환이다.
◾ Wide Transformation
- Shuffling
- Intersection and join, distinct, cartesian, reduceByKey(), groupByKey()
- 아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음
- Wide Transformation은 데이터가 정렬을 할 경우 파티션 아래쪽의 값이 다른 파티션의 위로 들어갈수도 있기에 많은 통신과 리소스가 필요하다.
- 따라서 많은 리소스를 요구하게 되고, 이 자원들에 대한 최적화가 필요하다.
✅ Actions
- collect()
- collect는 RDD안의 값을 모두 가져오므로 실제 환경에서는 지양해야 한다.
- 값 전부를 가져오다보니 낭비가 심해서 spark를 쓰는 의미가 없어짐
- count()
- countByValue()
- RDD안의 value값을 카운트해서 알려준다.
- take()
- RDD안의 값을 몇개만 가져올 수 있도록 한다.
- top()
- reduce()
- fold()
- foreach()
- 요소들을 하나하나씩 꺼내서 하나의 함수를 적용시키는데 쓰인다.
- 값을 리턴하지 않음
⌨ 코드
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("transformations_actions")
sc = SparkContext(conf = conf)
sc.getConf().getAll() # conf 내용 출력
sc.stop()
sc = SparkContext(conf=conf)
- sparkcontext를 만들고나서 또 만들려면 에러가 나기 때문에 중간에 stop()을 이용해서 기존의 spark context를 멈춰주고 새로 만들어야 한다.
foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면", "라면", "우동", "라면"])
foods
foods.collect()
- parallelize()는 리스트로부터 RDD를 만듦
- textFile()은 텍스트로부터 RDD를 만듦
foods.countByValue()
foods.take(5) # foods 리스트에서 앞에서부터 5개 추출
foods.first() # foods 리스트에서 맨 앞 추출
foods.count() # foods 리스트 개수 구하기
foods.distinct().count()
- distinct()는 Transformation이기 때문에 바로 실행되지 않음 -> Actions인 count()가 있어야 실행
foods.foreach(lambda x: print(x))
- 요소들을 하나하나씩 꺼내서 하나의 함수를 적용시키는데 쓰인다.
- foreach는 Actions이기 때문에 worker 노드에서 실행되게 된다.
- 우리가 주로 코딩하는 곳은 driver 프로그램이므로 Actions가 실행되는 곳과 달라 결과값이 보이지 않는다.
- worker 노드 어딘가에서 print()가 실행됐을 것
- 어딘가에 로그로 저장할때 유용하게 사용
sc.parallelize([1, 2, 3]).map(lambda x: x + 2).collect() # [3, 4, 5]
sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect() # [2, 4, 6]
movies = [
"그린 북",
"매트릭스",
"토이 스토리",
"캐스트 어웨이",
"포드 V 페라리",
"보헤미안 랩소디",
"빽 투 더 퓨처",
"반지의 제왕",
"죽은 시인의 사회"
]
moviesRDD = sc.parallelize(movies)
moviesRDD.collect()
flatMovies = moviesRDD.flatMap(lambda x: x.split(" "))
flatMovies.collect()
- flatMap(): 어떻게 펼쳐볼 것인가
filteredMovies = flatMovies.filter(lambda x: x!= "매트릭스") # 매트릭스 제외하고 불러오기
filteredMovies.collect()
num1 = sc.parallelize([1, 2, 3, 4])
num2 = sc.parallelize([4, 5, 6, 7, 8, 9, 10])
# numUnion.sample(T)
# [1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]
numUnion.sample(True, .5).collect() # -> [3, 4, 5, 6, 8, 9]
numUnion.sample(True, .5, seed=5).collect() -> [1, 4, 6, 9, 9, 10]
- 랜덤으로 추출되는 숫자들
- 매개변수 첫번째는 뽑힌 숫자를 다시 집어넣고 샘플링 할지 여부를 결정 (True인 경우 다시 집어 넣음)
- 매개변수 두번째는 원래 리스트에서 몇퍼센트 사이즈로 만들 것인지 결정
foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면", "라면", "우동", "라면", "치킨", "돈까스", "회", "햄버거", "피자"])
# groupBy
foodsGroup = foods.groupBy(lambda x: x[0]) #앞글자를 기준으로 정렬
res = foodsGroup.collect()
for (k, v) in res:
print(k, list(v))
nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
list(nums.groupBy(lambda x: x % 2).collect()[1][1]) # [2, 4, 6, 8, 10]
- 2로 나누어 떨어지는 요소들만 리스트화
'Big Data > Engineering' 카테고리의 다른 글
01_05. Cluster Topology (1) | 2023.05.13 |
---|---|
01_04. Cache & Persist (0) | 2023.05.13 |
01_02. 병렬처리에서 분산처리까지 (0) | 2023.05.12 |
01_01. Spark와 RDD (0) | 2023.05.12 |
[Hadoop] 하둡(Hadoop) 이론 (1) | 2023.05.11 |
Comments