핀아의 저장소 ( •̀ ω •́ )✧

01_03. RDD Transformations and Actions 본문

Big Data/Engineering

01_03. RDD Transformations and Actions

_핀아_ 2023. 5. 13. 01:24

Transformations & Actions

  • Transformations 
    • 결과값으로 새로운 RDD를 반환
    • Actions가 실행되기 전까진 실행되지 않는다.
    • 지연 실행(Lazy Execution)
  • Actions
    • 결과값을 연산하여 출력하거나 저장
    • 파이썬 오브젝트나 리스트를 반환한다.
    • 즉시 실행(Eager Execution)

✅ Transformations

  • map()
  • flatMap()
  • filter()
  • distinct()
  • reduceByKey()
  • groupByKey()
  • mapValues()
  • flatMapValues()
  • sortByKey()

◾ Narrow Transformation

  • 1:1 변환
  • filter(), map(), flatMap(), sample(), union()
  • 1열을 조작하기 위해 다른 열/파티션의 데이터를 쓸 필요가 없다.
  • 정렬이 필요하지 않은 경우
    • 정렬 같은 경우 새로운 RDD를 만들기 위해서 한 열을 처리를 할때 다른 열을 처리하여 비교해야 하는데 이때 많은 통신이 이루어진다.
    • 따라서, 이런 비교가 필요하지않은 경우가 일대일 변환이다.

◾ Wide Transformation

  • Shuffling
  • Intersection and join, distinct, cartesian, reduceByKey(), groupByKey()
  • 아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음
  • Wide Transformation은 데이터가 정렬을 할 경우 파티션 아래쪽의 값이 다른 파티션의 위로 들어갈수도 있기에 많은 통신과 리소스가 필요하다.
  • 따라서 많은 리소스를 요구하게 되고, 이 자원들에 대한 최적화가 필요하다.

✅ Actions

  • collect()
    • collect는 RDD안의 값을 모두 가져오므로 실제 환경에서는 지양해야 한다.
    • 값 전부를 가져오다보니 낭비가 심해서 spark를 쓰는 의미가 없어짐
  • count()
  • countByValue()
    • RDD안의 value값을 카운트해서 알려준다.
  • take()
    • RDD안의 값을 몇개만 가져올 수 있도록 한다.
  • top()
  • reduce()
  • fold()
  • foreach()
    • 요소들을 하나하나씩 꺼내서 하나의 함수를 적용시키는데 쓰인다.
    • 값을 리턴하지 않음

⌨ 코드

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("transformations_actions")
sc = SparkContext(conf = conf)
sc.getConf().getAll() # conf 내용 출력

sc.stop()
sc = SparkContext(conf=conf)
  • sparkcontext를 만들고나서 또 만들려면 에러가 나기 때문에 중간에 stop()을 이용해서 기존의 spark context를 멈춰주고 새로 만들어야 한다.
foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면",  "라면", "우동", "라면"])

foods

foods.collect()
  • parallelize()는 리스트로부터 RDD를 만듦
  • textFile()은 텍스트로부터 RDD를 만듦
foods.countByValue()

foods.take(5) # foods 리스트에서 앞에서부터 5개 추출
foods.first() # foods 리스트에서 맨 앞 추출
foods.count() # foods 리스트 개수 구하기
foods.distinct().count()
  • distinct()는 Transformation이기 때문에 바로 실행되지 않음 -> Actions인 count()가 있어야 실행
foods.foreach(lambda x: print(x))
  • 요소들을 하나하나씩 꺼내서 하나의 함수를 적용시키는데 쓰인다.
  • foreach는 Actions이기 때문에 worker 노드에서 실행되게 된다.
  • 우리가 주로 코딩하는 곳은 driver 프로그램이므로 Actions가 실행되는 곳과 달라 결과값이 보이지 않는다.
  • worker 노드 어딘가에서 print()가 실행됐을 것
  • 어딘가에 로그로 저장할때 유용하게 사용
sc.parallelize([1, 2, 3]).map(lambda x: x + 2).collect() # [3, 4, 5]

sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect() # [2, 4, 6]
movies = [
    "그린 북",
    "매트릭스",
    "토이 스토리",
    "캐스트 어웨이",
    "포드 V 페라리",
    "보헤미안 랩소디",
    "빽 투 더 퓨처",
    "반지의 제왕",
    "죽은 시인의 사회"
]

moviesRDD = sc.parallelize(movies)
moviesRDD.collect()

flatMovies = moviesRDD.flatMap(lambda x: x.split(" "))

flatMovies.collect()
  • flatMap(): 어떻게 펼쳐볼 것인가

filteredMovies = flatMovies.filter(lambda x: x!= "매트릭스") # 매트릭스 제외하고 불러오기
filteredMovies.collect()

num1 = sc.parallelize([1, 2, 3, 4])
num2 = sc.parallelize([4, 5, 6, 7, 8, 9, 10])

집합 관련

# numUnion.sample(T)
# [1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

numUnion.sample(True, .5).collect() # -> [3, 4, 5, 6, 8, 9]

numUnion.sample(True, .5, seed=5).collect() -> [1, 4, 6, 9, 9, 10]
  • 랜덤으로 추출되는 숫자들
  • 매개변수 첫번째는 뽑힌 숫자를 다시 집어넣고 샘플링 할지 여부를 결정 (True인 경우 다시 집어 넣음)
  • 매개변수 두번째는 원래 리스트에서 몇퍼센트 사이즈로 만들 것인지 결정
foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면",  "라면", "우동", "라면", "치킨", "돈까스", "회", "햄버거", "피자"])

# groupBy
foodsGroup = foods.groupBy(lambda x: x[0]) #앞글자를 기준으로 정렬
res = foodsGroup.collect()

for (k, v) in res:
	print(k, list(v))

nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
list(nums.groupBy(lambda x: x % 2).collect()[1][1]) # [2, 4, 6, 8, 10]
  • 2로 나누어 떨어지는 요소들만 리스트화

'Big Data > Engineering' 카테고리의 다른 글

01_05. Cluster Topology  (1) 2023.05.13
01_04. Cache & Persist  (0) 2023.05.13
01_02. 병렬처리에서 분산처리까지  (0) 2023.05.12
01_01. Spark와 RDD  (0) 2023.05.12
[Hadoop] 하둡(Hadoop) 이론  (1) 2023.05.11
Comments