A.I
Hadoop & Spark 본문
하둡 & 스파크¶
빅데이터 컴포넌트 - Hadoop ecosystem¶
데이터 수집(Data Ingestion)
- 스쿱(Sqoop) : RDBMS(오라클, MySQL등..)와 하둡 사이의 데이터를 이동시켜줍니다.
- 플럼(Flume) : 분산환경에서 대량의 로그데이터를 효과적으로 수집하여 합친 후 다른 곳으로 전송합니다.
데이터 처리(Data Processing)
- 하둡 분산파일시스템(HDFS): 하둡의 분산 처리 파일 시스템.
- 맵리듀스(MapReduce): Java기반의 맵리듀스 프로그래밍 모델입니다.
- 얀(Yarn): 하둡 클러스터의 자원(Resource)을 관리.
- 스파크(Spark): In-memory기반의 클러스터 컴퓨팅 데이터 처리입니다. 스파크 안에도 스파크 코어, 스파크SQ, Milib, GraphX과 같은 컴포넌트가 있습니다.
데이터 분석(Data Analysis)
- 피그(Pig): 맵리듀스로 실행하기 어려운 데이터 관련 작업, filter, join, query와 같은 작업을 실행합니다.
- 임팔라(Impala): 고성능의 SQL 엔진.
- 하이브(Hive): 임팔라와 유사한 SQL 관련 기능을 제공합니다.
데이터 검색(Data Exploration)
- 클라우데라 서치(Cloudera Search): real-time으로 데이터에 검색이 가능합니다.
- 휴(Hue): 웹 인터페이스 제공.
기타
- 우지(Oozie): 워크플로우 관리, Job 스케쥴러.
- HBase: NoSQL기반으로 HDFS에 의해 처리된 데이터를 저장합니다.
- 제플린(Zeppelin): 데이터 시각화.
- SparkMLlib, 머하웃(mahout): 머신러닝 관련 라이브러리.
빅데이터 컴포넌트 - Spark ecosystem¶
- 프로그래밍 언어: Scala, Java, Python, R, SQL
스파크 라이브러리
- Spark SQL: SQL 관련 작업
- Streaming: Streaming 데이터 처리
- MLlib: Machine Learning 관련 라이브러리
- GraphX: Graph Processing
자원관리(주로 클러스터 관리)는 하둡의 Yarn 또는 Mesos를 사용하거나, 또는 스파크 자체의 관리기능을 그대로 사용합니다.
- 데이터 저장(Storage)은 Local FS(File System)이나 하둡의 HDFS를 이용하거나 AWS의 S3 인스턴스를 이용하기도 합니다. (주로 Amazon S3를 많이 사용합니다. ) 그리고 기존의 RDBMS나 NoSQL을 사용하는 경우도 있습니다. 하둡의 HDFS같이 스파크의 전용 분산 데이터 저장 시스템을 별도로 가지고 있지 않다는 점에서, 스파크의 에코시스템이 가지는 유연한 확장성이 강조된 설계 사상을 확인할 수 있습니다.
Spark 데이터 처리 : RDD¶
RDD의 특징¶
- In-Memory
- Fault Tolerance
- Immutable(Read-Only)
- Partition [파티션]
RDD의 생성¶
- 내부에서 만들어진 데이터 집합을 병렬화하는 방법: parallelize()함수 사용
- 외부의 파일을 로드하는 방법: .textFile() 함수 사용
RDD의 동작¶
- Transformations = RDD에게 변형 방법(연산 로직, 계보, lineage)을 알려주고 새로운 RDD를 만든다
- Actions = 실제 연산
RDD 동작 순서¶
- sc.textFile()을통해 RDD를 생성, 실제 객체는 생성되지 않았다
- 그리고 transformations함수 중 하나인 filter()를 만든다
- 실제 RDD가 생성되는 시점은 Actions의 함수인 counts()를 실행할 때
결괏값이 필요할 때까지 계산을 늦추다가 정말 필요한 시기에 계산을 수행하는 방법을 느긋한 계산법(Lazy evaluation)이라고 한다
PySpark¶
PySpark 설치¶
- 필요한 라이브러리 버전
- java (>= 8.0)
- Spark (>= 2.2.0)
- Python (>= 3.4.0)
자바 설치 - sudo apt-get install openjdk-8-jdk-headless -y
스파크 설치
- wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
- tar xf spark-3.0.1-bin-hadoop2.7.tgz
- cd spark-3.0.1-bin-hadoop2.7
- cd bin
- ./spark-shell
- pip install pyspark
SparkContext를 통한 스파크 초기화¶
문법: pyspark.SparkContext()
스파크기능의 기본 엔트리포인트입니다.
스파크 클러스터와 연결을 나타내며 RDD를 만들고 브로드캐스트하는데 사용될 수 있습니다.
JVM 당 하나만 활성화해야 하며, 새로운 것을 만들기 전에는 활성을 중지해야 합니다
In [1]:
from pyspark import SparkConf, SparkContext
sc = SparkContext()
sc
Out[1]:
In [2]:
type(sc)
Out[2]:
pyspark.context.SparkContext
In [3]:
sc.stop()
In [4]:
sc = SparkContext(master='local', appName='PySpark Basic')
sc
Out[4]:
In [5]:
sc.getConf().getAll()
Out[5]:
[('spark.master', 'local'), ('spark.app.startTime', '1615351951279'), ('spark.app.name', 'PySpark Basic'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.app.id', 'local-1615351951339'), ('spark.driver.port', '43791'), ('spark.submit.pyFiles', ''), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.driver.host', '192.168.219.101'), ('spark.ui.showConsoleProgress', 'true')]
In [6]:
sc.master
Out[6]:
'local'
In [7]:
sc.appName
Out[7]:
'PySpark Basic'
In [8]:
sc.stop()
In [9]:
conf = SparkConf().setAppName('PySpark Basic').setMaster('local')
sc = SparkContext(conf=conf)
sc
Out[9]:
RDD Creation¶
내부에서 만들어진 데이터 집합을 병렬화¶
In [10]:
rdd = sc.parallelize([1,2,3])
rdd
Out[10]:
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274
In [11]:
type(rdd)
Out[11]:
pyspark.rdd.RDD
In [12]:
# RDD의 원소 반환
rdd.take(3)
Out[12]:
[1, 2, 3]
외부의 파일을 로드¶
In [13]:
import os
file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/test.txt'
with open(file_path, 'w') as f:
for i in range(10):
f.write(str(i)+'\n')
print('OK')
OK
In [14]:
rdd2 = sc.textFile(file_path)
print(rdd2)
print(type(rdd2))
/home/ssac24/aiffel/bigdata_ecosystem/test.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 <class 'pyspark.rdd.RDD'>
In [15]:
rdd2.take(3)
Out[15]:
['0', '1', '2']
RDD Operation (1) Transformations¶
Map()¶
In [16]:
x = sc.parallelize(["b", "a", "c", "d"])
y = x.map(lambda z: (z, 1))
print(x.collect()) #collect()는 actions입니다.
print(y.collect())
['b', 'a', 'c', 'd'] [('b', 1), ('a', 1), ('c', 1), ('d', 1)]
In [17]:
nums = sc.parallelize([1, 2, 3])
squares = nums.map(lambda x: x*x)
print(squares.collect())
[1, 4, 9]
Filter()¶
In [18]:
x = sc.parallelize([1,2,3,4,5])
y = x.filter(lambda x: x%2 == 0)
print(x.collect())
print(y.collect())
[1, 2, 3, 4, 5] [2, 4]
In [19]:
text = sc.parallelize(['a', 'b', 'c', 'd'])
capital = text.map(lambda x: x.upper())
A = capital.filter(lambda x: 'A' in x)
print(text.collect())
print(A.collect())
['a', 'b', 'c', 'd'] ['A']
Flatmap()¶
In [20]:
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*10, 30))
print(x.collect())
print(y.collect())
[1, 2, 3] [1, 10, 30, 2, 20, 30, 3, 30, 30]
In [21]:
wordsDataset = sc.parallelize(["Spark is funny", "It is beautiful", "And also It is implemented by python"])
result = wordsDataset.flatMap(lambda x: x.split()).filter(lambda x: x != " ").map(lambda x: x.lower())
# 공백은 제거합니다.
# 단어를 공백기준으로 split 합니다.
result.collect()
Out[21]:
['spark', 'is', 'funny', 'it', 'is', 'beautiful', 'and', 'also', 'it', 'is', 'implemented', 'by', 'python']
CSV파일 읽기¶
- wget https://storage.googleapis.com/tf-datasets/titanic/train.csv
- mv train.csv ~/aiffel/bigdata_ecosystem
In [22]:
import os
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_0.take(5)
Out[22]:
['survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone', '0,male,22.0,1,0,7.25,Third,unknown,Southampton,n', '1,female,38.0,1,0,71.2833,First,C,Cherbourg,n', '1,female,26.0,0,0,7.925,Third,unknown,Southampton,y', '1,female,35.0,1,0,53.1,First,C,Southampton,n']
In [23]:
# 비어있는 라인은 제외하고, delimeter인 ,로 line을 분리해 줍니다.
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))
csv_data_1.take(5)
Out[23]:
[['survived', 'sex', 'age', 'n_siblings_spouses', 'parch', 'fare', 'class', 'deck', 'embark_town', 'alone'], ['0', 'male', '22.0', '1', '0', '7.25', 'Third', 'unknown', 'Southampton', 'n'], ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'], ['1', 'female', '26.0', '0', '0', '7.925', 'Third', 'unknown', 'Southampton', 'y'], ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n']]
In [24]:
columns = csv_data_1.take(1)
columns
Out[24]:
[['survived', 'sex', 'age', 'n_siblings_spouses', 'parch', 'fare', 'class', 'deck', 'embark_town', 'alone']]
In [25]:
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal()) # 첫 번째 컬럼이 숫자인 것만 필터링
csv_data_2.take(5)
Out[25]:
[['0', 'male', '22.0', '1', '0', '7.25', 'Third', 'unknown', 'Southampton', 'n'], ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'], ['1', 'female', '26.0', '0', '0', '7.925', 'Third', 'unknown', 'Southampton', 'y'], ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n'], ['0', 'male', '28.0', '0', '0', '8.4583', 'Third', 'unknown', 'Queenstown', 'y']]
In [26]:
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])
csv_data_3.take(5)
Out[26]:
[[('survived', '0'), ('sex', 'male'), ('age', '22.0'), ('n_siblings_spouses', '1'), ('parch', '0'), ('fare', '7.25'), ('class', 'Third'), ('deck', 'unknown'), ('embark_town', 'Southampton'), ('alone', 'n')], [('survived', '1'), ('sex', 'female'), ('age', '38.0'), ('n_siblings_spouses', '1'), ('parch', '0'), ('fare', '71.2833'), ('class', 'First'), ('deck', 'C'), ('embark_town', 'Cherbourg'), ('alone', 'n')], [('survived', '1'), ('sex', 'female'), ('age', '26.0'), ('n_siblings_spouses', '0'), ('parch', '0'), ('fare', '7.925'), ('class', 'Third'), ('deck', 'unknown'), ('embark_town', 'Southampton'), ('alone', 'y')], [('survived', '1'), ('sex', 'female'), ('age', '35.0'), ('n_siblings_spouses', '1'), ('parch', '0'), ('fare', '53.1'), ('class', 'First'), ('deck', 'C'), ('embark_town', 'Southampton'), ('alone', 'n')], [('survived', '0'), ('sex', 'male'), ('age', '28.0'), ('n_siblings_spouses', '0'), ('parch', '0'), ('fare', '8.4583'), ('class', 'Third'), ('deck', 'unknown'), ('embark_town', 'Queenstown'), ('alone', 'y')]]
In [27]:
# CSV파일을 DataFrame으로 읽는 방법
from pyspark import SparkConf, SparkContext, SQLContext
url = 'https://storage.googleapis.com/tf-datasets/titanic/train.csv'
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)
df = sqlContext.read.csv(SparkFiles.get("train.csv"), header=True, inferSchema= True)
df.show(5, truncate = False)
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+ |survived|sex |age |n_siblings_spouses|parch|fare |class|deck |embark_town|alone| +--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+ |0 |male |22.0|1 |0 |7.25 |Third|unknown|Southampton|n | |1 |female|38.0|1 |0 |71.2833|First|C |Cherbourg |n | |1 |female|26.0|0 |0 |7.925 |Third|unknown|Southampton|y | |1 |female|35.0|1 |0 |53.1 |First|C |Southampton|n | |0 |male |28.0|0 |0 |8.4583 |Third|unknown|Queenstown |y | +--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+ only showing top 5 rows
In [28]:
# 위에서 얻은 데이터에서 40세 이상인 사람들의 데이터만 필터링해 봅시다.
df2 = df[df['age']>40]
df2.show(5, truncate = False)
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+ |survived|sex |age |n_siblings_spouses|parch|fare |class |deck |embark_town|alone| +--------+------+----+------------------+-----+-------+------+-------+-----------+-----+ |0 |male |66.0|0 |0 |10.5 |Second|unknown|Southampton|y | |0 |male |42.0|1 |0 |52.0 |First |unknown|Southampton|n | |1 |female|49.0|1 |0 |76.7292|First |D |Cherbourg |n | |0 |male |65.0|0 |1 |61.9792|First |B |Cherbourg |n | |0 |male |45.0|1 |0 |83.475 |First |C |Southampton|n | +--------+------+----+------------------+-----+-------+------+-------+-----------+-----+ only showing top 5 rows
RDD Operation (2) Actions¶
collect¶
In [29]:
# RDD 내의 모든 값을 리턴
nums = sc.parallelize(list(range(10)))
nums.collect()
Out[29]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
take¶
In [30]:
# 앞쪽 n개의 데이터의 list를 리턴
nums.take(3)
Out[30]:
[0, 1, 2]
count¶
In [31]:
# RDD에 포함된 데이터 개수를 리턴
nums.count()
Out[31]:
10
reduce¶
In [32]:
nums.reduce(lambda x, y: x + y)
Out[32]:
45
saveAsTextFile¶
In [34]:
# RDD 데이터를 파일로 저장
file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/file1.txt'
nums.saveAsTextFile(file_path)
!ls -l ~/aiffel/bigdata_ecosystem
합계 44 drwxr-xr-x 2 ssac24 ssac24 4096 3월 10 14:12 file.txt drwxr-xr-x 2 ssac24 ssac24 4096 3월 10 14:13 file1.txt -rw-rw-r-- 1 ssac24 ssac24 20 3월 10 13:57 test.txt -rw-rw-r-- 1 ssac24 ssac24 30874 2월 21 2019 train.csv
In [37]:
# RDD 생성
rdd = sc.parallelize(range(1,100))
# RDD Transformation
rdd2 = rdd.map(lambda x: 0.5*x - 10).filter(lambda x: x > 0)
# RDD Action
rdd2.reduce(lambda x, y: x + y)
Out[37]:
1580.0
RDD Operation (3) 실습:MapReduce¶
1. Word Counter 구현¶
In [38]:
text = sc.parallelize('hello python')
# map 함수를 적용한 RDD 구하기
text_1 = text.filter(lambda x: x != " ")
text_2 = text_1.map(lambda x:(x, 1))
#reduceByKey 함수를 적용한 Word Counter 출력
word_count = text_2.reduceByKey(lambda accum, n: accum + n)
word_count.collect()
Out[38]:
[('h', 2), ('e', 1), ('l', 2), ('o', 2), ('p', 1), ('y', 1), ('t', 1), ('n', 1)]
2. Titanic 데이터 분석¶
In [39]:
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))
columns = csv_data_1.take(1)
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])
csv_data_3.take(3)
Out[39]:
[[('survived', '0'), ('sex', 'male'), ('age', '22.0'), ('n_siblings_spouses', '1'), ('parch', '0'), ('fare', '7.25'), ('class', 'Third'), ('deck', 'unknown'), ('embark_town', 'Southampton'), ('alone', 'n')], [('survived', '1'), ('sex', 'female'), ('age', '38.0'), ('n_siblings_spouses', '1'), ('parch', '0'), ('fare', '71.2833'), ('class', 'First'), ('deck', 'C'), ('embark_town', 'Cherbourg'), ('alone', 'n')], [('survived', '1'), ('sex', 'female'), ('age', '26.0'), ('n_siblings_spouses', '0'), ('parch', '0'), ('fare', '7.925'), ('class', 'Third'), ('deck', 'unknown'), ('embark_town', 'Southampton'), ('alone', 'y')]]
In [40]:
# 생존자와 사망자의 연령 총합 구하기
csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1])) # (생존여부, 연령)
age_sum_data = csv_data_4.reduceByKey(lambda accum, age: float(accum) + float(age))
age_sum = age_sum_data.collect()
# 생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count))
survived_count = survived_data.collect()
age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)
생존자 평균 연령: 29.110411522633743 사망자 평균 연령: 29.9609375
'파이썬 & AI 학습' 카테고리의 다른 글
파이썬 데이터베이스 만들기 Pandas (0) | 2021.03.14 |
---|---|
딥러닝 레이어에 대한 이해 (0) | 2021.03.12 |
Linear, Convolution layer (0) | 2021.03.06 |
MapReduce 원리 (0) | 2021.03.04 |
TensorFlow v2 다뤄보기 (0) | 2021.02.26 |