핀아의 저장소 ( •̀ ω •́ )✧
01_01. Spark와 RDD 본문
- 빅데이터 처리를 위한 오픈소스 고속 분산처리 엔진
- 연산 엔진을 대체하는 프로젝트이다.
✨ 자세한 내용은 아래 게시글을 참고해 주세요!
https://mydb-lib.tistory.com/entry/Hadoop-%ED%95%98%EB%91%A1Hadoop-%EC%9D%B4%EB%A1%A0
[Hadoop] 하둡(Hadoop) 이론
Hadoop이란? 분산환경에서 빅데이터를 저장하고 처리할 수 있는 자바 기반의 오픈 소스 프레임워크로 하둡 소프트웨어 라이브러리는 간단한 프로그래밍 모델을 사용하여 여러대의 컴퓨터 클러스
mydb-lib.tistory.com
- 스파크는 빠르다는 장점이 있다.
- 컴퓨터가 연산을 시작하면 하드디스크에서 CPU까지 데이터가 위로 이동한다.
- 연산에 자주 쓰이는 데이터는 위로가며(CPU가 데이터를 더 빨리 접근하기 위해) 연산에 자주 쓰이지 않는 데이터는 아래로 간다.
- 하지만 처리해야하는 데이터가 많을 때 메모리나 하드디스크에 저장이 안 된다. -> 여기서 디스크를 쓰게 되면 너무 느리다는 문제가 발생한다.
- 따라서 데이터를 쪼개서 처리하는 방법을 고안하였다.
- 스파크는 빅데이터를 메모리에 올려서 연상하기 때문에 속도가 빠르다
- 고속 = 빅데이터의 In-Memory 연산
✅ Spark Cluster
💯 그렇다면 왜 내 컴퓨터에서 Spark 프로그램을 돌리면 판다스보다 느릴까?
- Spark는 확장성을 고려해서 설계했기 때문이다.
- 스파크의 노드는 계속 늘려서 성능을 증가시킬 수 있다.
✅ Spark 특징
- Hadoop MapReduce 보다 빠르다
- 메모리 상에선 100배
- 디스크 상에선 10배
- Lazy Evaluation
- Task를 정의할 때는 연산을 하지 않다가 결과가 필요할 때 연산한다.
- 기다리면서 연산 과정을 최적화 할 수 있다.
- lazy Evaluation은 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 가장 최적의 수행 방법을 찾아 처리하는 것을 말한다.
✅ Spark 구성
- Spark Core
- Spark SQL
- Spark Streaming
- MLlib
- GraphX
✅ 스파크의 핵심 데이터 모델
Resilient Distributed Dataset (RDD)
- 탄력적인 분산 데이터셋이라는 뜻이다.
- Resillient (회복력 있는, 변하지 않는)
- 메모리 내부의 데이터가 손실 되었을 때, 유실된 파티션을 재연산해 데이터를 복구할 수 있다.
- Distributed (분산된)
- 스파크 클러스터를 통하여, 메모리에 분산 되어 저장된다.
- Data
- 데이터다.
- 즉, RDD 는 여러 분산 노드에 걸쳐 저장되는, 변경이 불가능한 데이터의 집합이다.
- 따라서, RDD 를 변경하기 위해선 새로운 RDD 를 생성하는 방법뿐이다.
- 5가지 특징이 있다.
- 데이터 추상화
- 탄력적이고 불변하는 성질
- Type-safe
- Unstructured / Structured Data
- Lazy
1️⃣ 데이터 추상화
- 데이터는 클러스터에 흩어져있지만 하나의 파일인 것처럼 사용이 가능하다.
- 여러 노드에 담긴 데이터를 lines라는 명령어 하나로 사용 가능하다. (위 코드 참고)
2️⃣ 탄력적이고 불변하는 성질
- 테이터가 여러군데서 연산되는 과정에서 여러 노드 중 하나가 망가지는 일이 생각보다 많이 일어난다.
- 네트워크 장애
- 하드웨어 / 메모리 문제
- 알수없는 갖가지 이유
- 하지만 스파크의 Immutable 성질로 인해서 RDD1이 변환을 거치면 RDD1이 바뀌는게 아니라 새로운 RDD2가 만들어진다. -> 변환을 거칠때마다 연산의 기록이 남는다.
- RDD의 변환 과정은 하나의 비순환 그래프로 그릴 수 있게 된다. -> 덕분에 문제가 생길 경우 쉽게 전 RDD로 돌아갈 수 있다. = 탄력적이다.
3️⃣ Type-safe
- 컴파일시 Type을 판별할 수 있어 문제를 일찍 발견할 수 있다.
4️⃣ Unstructured / Structured Data
- Unstructured / Structured 둘 다 담을 수 있다.
5️⃣ Lazy
- 게으르다 = 결과가 필요할때까지 연산을 하지 않는다.
- 액션(Action)을 할 때까지 변환(Transformation)은 실행되지 않는다.
- Action을 만나면 전부 실행된다.
=> 게으른 연산 (Lazy Evaluation)
- Spark Operation은 두가지로 나뉠 수 있다.
💥 RDD를 사용하는 이유
- 유연하다.
- 짧은 코드로 할 수 있는게 많다.
- 개발할 때 무엇보다는 어떻게에 대해 더 생각하게 된다 (how-to)
- 게으른 연산 덕분에 데이터가 어떻게 변환될지 생각하게 된다.
- 데이터가 지나갈 길을 닦는 느낌
⌨ 코드
from pyspark import SparkConf, SparkContext
import pandas as pd
# Spark 설정
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
sc = SparkContext(conf=conf)
# 우리가 가져올 데이터가 있는 파일
directory = "???/spark/data"
filename = "fhvhv_tripdata_2020-03.csv"
# 데이터 로딩후 RDD 생성
lines = sc.textFile(f"file:///{directory}/{filename}")
# 필요한 부분만 추출하기
header = lines.first()
filtered_lines = lines.filter(lambda row:row != header)
dates = filtered_lines.map(lambda x: x.split(",")[2].split(" ")[0])
result = dates.countByValue()
# 아래는 Spark코드가 아닌 일반적인 파이썬 코드
# CSV로 결과값 저장
pd.Series(result, name="trips").to_csv("trips_date.csv")
import matplotlib.pyplot as plt
%matplotlib inline
trips = pd.read_csv("trips_date.csv")
trips.plot()
from pyspark import SparkConf, SparkContext
import pandas as pd
- SparkConf : 사용자가 재정의해서 쓸 수 있는 설정 옵션들에 대한 키와 값을 갖고있는 객체
- SparkContext : Spark 클러스터와 연결시켜주는 객체
- Spark 모든 기능에 접근할 수 있는 시작점
- Spark는 분산환경에서 동작하기 때문에 Driver Program 을 구동시키기 위해 SparkContext가 필요
- SparkContext는 프로그램당 하나만 만들 수 있고 사용후에는 종료
- SparkContext 객체는 내부에 자바로 동작하는 Py4J의 SparkContext와 연결
- 이 덕분에 파이썬으로 코딩하면서도 자바 위에서 동작하는 프로그램을 작성할 수 있다.
- RDD를 만들 수 있다.
# Spark 설정
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
sc = SparkContext(conf=conf)
- setMaster("local") - 분산된 환경이 아닌 개발용 로컬 환경을 쓴다는 뜻
- setAppName - Spark UI에서 확인 가능한 스파크 앱 이름
# 데이터 로딩후 RDD 생성
lines = sc.textFile(f"file:///{directory}/{filename}")
# 필요한 부분만 추출하기
header = lines.first()
filtered_lines = lines.filter(lambda row: rox != header)
# 아래와 같은 코드
def f(row):
return row != header
lines.filter(f)
dates = filtered_lines.map(lambda x: x.split(",")[2].split(" ")[0])
# 아래와 같다
def f(x):
return x.split(",")[2].split(" ")[0]
- map()함수로 우리가 원하는 부분만 추출 할 수 있다.
result = dates.countByValue()
- 값이 얼마나 등장하는지 세준다.
# result는 이제 더이상 RDD가 아닌 Python 객체
pd.Series(result, name="trips")/to_csv("trips_date.csv")
import matplotlib.pyplot as plt
%matplotlib inline
trips = pd.read_csv("trips_date.csv")
trips.plot()
'Big Data > Engineering' 카테고리의 다른 글
01_03. RDD Transformations and Actions (2) | 2023.05.13 |
---|---|
01_02. 병렬처리에서 분산처리까지 (0) | 2023.05.12 |
[Hadoop] 하둡(Hadoop) 이론 (1) | 2023.05.11 |
00_04. 프로젝트 INTRO (1) | 2023.05.11 |
00_03. Dataflow Orchestration (0) | 2023.05.11 |
Comments