핀아의 저장소 ( •̀ ω •́ )✧

01_01. Spark와 RDD 본문

Big Data/Engineering

01_01. Spark와 RDD

_핀아_ 2023. 5. 12. 00:35

  • 빅데이터 처리를 위한 오픈소스 고속 분산처리 엔진
  • 연산 엔진을 대체하는 프로젝트이다.

✨ 자세한 내용은 아래 게시글을 참고해 주세요!

https://mydb-lib.tistory.com/entry/Hadoop-%ED%95%98%EB%91%A1Hadoop-%EC%9D%B4%EB%A1%A0

 

[Hadoop] 하둡(Hadoop) 이론

Hadoop이란? 분산환경에서 빅데이터를 저장하고 처리할 수 있는 자바 기반의 오픈 소스 프레임워크로 하둡 소프트웨어 라이브러리는 간단한 프로그래밍 모델을 사용하여 여러대의 컴퓨터 클러스

mydb-lib.tistory.com

  • 스파크는 빠르다는 장점이 있다.
    • 컴퓨터가 연산을 시작하면 하드디스크에서 CPU까지 데이터가 위로 이동한다.
    • 연산에 자주 쓰이는 데이터는 위로가며(CPU가 데이터를 더 빨리 접근하기 위해) 연산에 자주 쓰이지 않는 데이터는 아래로 간다.

  • 하지만 처리해야하는 데이터가 많을 때 메모리나 하드디스크에 저장이 안 된다. -> 여기서 디스크를 쓰게 되면 너무 느리다는 문제가 발생한다.
  • 따라서 데이터를 쪼개서 처리하는 방법을 고안하였다.

  • 스파크는 빅데이터를 메모리에 올려서 연상하기 때문에 속도가 빠르다
    • 고속 = 빅데이터의 In-Memory 연산

✅ Spark Cluster

💯 그렇다면 왜 내 컴퓨터에서 Spark 프로그램을 돌리면 판다스보다 느릴까?

  • Spark는 확장성을 고려해서 설계했기 때문이다.

  • 스파크의 노드는 계속 늘려서 성능을 증가시킬 수 있다.


Spark 특징

  • Hadoop MapReduce 보다 빠르다
    • 메모리 상에선 100배
    • 디스크 상에선 10배
  • Lazy Evaluation
    • Task를 정의할 때는 연산을 하지 않다가 결과가 필요할 때 연산한다.
    • 기다리면서 연산 과정을 최적화 할 수 있다.
    • lazy Evaluation은 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 가장 최적의 수행 방법을 찾아 처리하는 것을 말한다.

 Spark 구성

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • MLlib
  • GraphX

스파크의 핵심 데이터 모델

Resilient Distributed Dataset (RDD)

박스친 곳이 RDD 부분이다.

  • 탄력적인 분산 데이터셋이라는 뜻이다.
  • Resillient (회복력 있는, 변하지 않는)
    • 메모리 내부의 데이터가 손실 되었을 때, 유실된 파티션을 재연산해 데이터를 복구할 수 있다.
  • Distributed (분산된)
    • 스파크 클러스터를 통하여, 메모리에 분산 되어 저장된다.
  • Data
    • 데이터다.
  • 즉, RDD 는 여러 분산 노드에 걸쳐 저장되는, 변경이 불가능한 데이터의 집합이다.
  • 따라서, RDD 를 변경하기 위해선 새로운 RDD 를 생성하는 방법뿐이다.
  • 5가지 특징이 있다.
    1. 데이터 추상화
    2. 탄력적이고 불변하는 성질
    3. Type-safe
    4. Unstructured / Structured Data
    5. Lazy

1️⃣ 데이터 추상화

  • 데이터는 클러스터에 흩어져있지만 하나의 파일인 것처럼 사용이 가능하다.
  • 여러 노드에 담긴 데이터를 lines라는 명령어 하나로 사용 가능하다. (위 코드 참고)

2️⃣ 탄력적이고 불변하는 성질

  • 테이터가 여러군데서 연산되는 과정에서 여러 노드 중 하나가 망가지는 일이 생각보다 많이 일어난다.
    • 네트워크 장애
    • 하드웨어 / 메모리 문제
    • 알수없는 갖가지 이유
  • 하지만 스파크의 Immutable 성질로 인해서 RDD1이 변환을 거치면 RDD1이 바뀌는게 아니라 새로운 RDD2가 만들어진다. -> 변환을 거칠때마다 연산의 기록이 남는다.
  • RDD의 변환 과정은 하나의 비순환 그래프로 그릴 수 있게 된다. -> 덕분에 문제가 생길 경우 쉽게 전 RDD로 돌아갈 수 있다. = 탄력적이다.

3️⃣ Type-safe

  • 컴파일시 Type을 판별할 수 있어 문제를 일찍 발견할 수 있다.

4️⃣ Unstructured / Structured Data

  • Unstructured / Structured 둘 다 담을 수 있다.

5️⃣ Lazy

  • 게으르다 = 결과가 필요할때까지 연산을 하지 않는다.
  • 액션(Action)을 할 때까지 변환(Transformation)은 실행되지 않는다.
  • Action을 만나면 전부 실행된다.

=> 게으른 연산 (Lazy Evaluation)

  • Spark Operation은 두가지로 나뉠 수 있다.

💥 RDD를 사용하는 이유

  1. 유연하다.
  2. 짧은 코드로 할 수 있는게 많다.
  3. 개발할 때 무엇보다는 어떻게에 대해 더 생각하게 된다 (how-to)
    • 게으른 연산 덕분에 데이터가 어떻게 변환될지 생각하게 된다.
    • 데이터가 지나갈 길을 닦는 느낌

⌨ 코드

 

from pyspark import SparkConf, SparkContext
import pandas as pd

# Spark 설정
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
sc = SparkContext(conf=conf)

# 우리가 가져올 데이터가 있는 파일
directory = "???/spark/data"
filename = "fhvhv_tripdata_2020-03.csv"

# 데이터 로딩후 RDD 생성
lines = sc.textFile(f"file:///{directory}/{filename}")

# 필요한 부분만 추출하기
header = lines.first() 
filtered_lines = lines.filter(lambda row:row != header) 

dates = filtered_lines.map(lambda x: x.split(",")[2].split(" ")[0])

result = dates.countByValue()

# 아래는 Spark코드가 아닌 일반적인 파이썬 코드
# CSV로 결과값 저장 
pd.Series(result, name="trips").to_csv("trips_date.csv")

import matplotlib.pyplot as plt
%matplotlib inline  

trips = pd.read_csv("trips_date.csv")
trips.plot()

from pyspark import SparkConf, SparkContext
import pandas as pd
  • SparkConf : 사용자가 재정의해서 쓸 수 있는 설정 옵션들에 대한 키와 값을 갖고있는 객체
  • SparkContext : Spark 클러스터와 연결시켜주는 객체
    • Spark 모든 기능에 접근할 수 있는 시작점
    • Spark는 분산환경에서 동작하기 때문에 Driver Program 을 구동시키기 위해 SparkContext가 필요
    • SparkContext는 프로그램당 하나만 만들 수 있고 사용후에는 종료

  • SparkContext 객체는 내부에 자바로 동작하는 Py4J의 SparkContext와 연결
  • 이 덕분에 파이썬으로 코딩하면서도 자바 위에서 동작하는 프로그램을 작성할 수 있다.
  • RDD를 만들 수 있다.
# Spark 설정
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
sc = SparkContext(conf=conf)
  • setMaster("local") - 분산된 환경이 아닌 개발용 로컬 환경을 쓴다는 뜻
  • setAppName - Spark UI에서 확인 가능한 스파크 앱 이름
# 데이터 로딩후 RDD 생성

lines = sc.textFile(f"file:///{directory}/{filename}")

# 필요한 부분만 추출하기
header = lines.first()
filtered_lines = lines.filter(lambda row: rox != header)


# 아래와 같은 코드
def f(row):
    return row != header
lines.filter(f)

dates = filtered_lines.map(lambda x: x.split(",")[2].split(" ")[0])


# 아래와 같다
def f(x):
    return x.split(",")[2].split(" ")[0]
  • map()함수로 우리가 원하는 부분만 추출 할 수 있다.

result = dates.countByValue()
  • 값이 얼마나 등장하는지 세준다.

# result는 이제 더이상 RDD가 아닌 Python 객체

pd.Series(result, name="trips")/to_csv("trips_date.csv")

import matplotlib.pyplot as plt
%matplotlib inline  

trips = pd.read_csv("trips_date.csv")
trips.plot()

'Big Data > Engineering' 카테고리의 다른 글

01_03. RDD Transformations and Actions  (2) 2023.05.13
01_02. 병렬처리에서 분산처리까지  (0) 2023.05.12
[Hadoop] 하둡(Hadoop) 이론  (1) 2023.05.11
00_04. 프로젝트 INTRO  (1) 2023.05.11
00_03. Dataflow Orchestration  (0) 2023.05.11
Comments