스파크는 키/값 쌍을 가지고 있는 RDD에 대해 특수한 연산들을 제공한다. 이 RDD들은 페어 RDD라고 불리는데, Pair RDD 들은 각 키에 대해 병렬로 처리하거나 네트워크 상에서 데이터를 다시 그룹핑하는 역할을 한다. 예를 들어, reduceByKey(), join() 등의 메소드들을 갖고 있다.
일반 RDD를 Pair RDD로 만들기 위해서는 키/값 쌍을 되돌려주는 map()
함수를 써서 변환할 수 있다. 키/값 RDD를 생성하는 방법은 언어마다 다르다.
val pairs = lines.map(x => (x.split(" ")(0), x))
- 스칼라에서는 키를 가지는 데이터를 위한 함수들을 위해 튜플을 리턴해야 한다. 튜플의 RDD에는 추가적인 키/값 함수들을 제공하기 위해 묵시적인 변환이 있다
- 메모리에 있는 데이터로부터 페어 RDD를 만드려면 페어 데이터세트에
SparkContext.parallelize()
를 호출하면 된다
####자바에서 첫번째 단어를 키로 사용한 페어 RDD 생성
PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
- 자바는 내장 튜플 타입이 없으므로 스파크의 자바 API에서
scala.Tuple2
클래스를 사용하여 사용자가 튜플을 만들어야 한다 - 메모리에 있는 데이터로부터 페어 RDD를 만드려면 페어 데이터세트에
SparkContext.parallelizePairs()
를 호출하면 된다
Pair RDD는 기본 RDD에서 가능한 모든 transformation을 사용할 수 있다.
함수 이름 | 목적 | 예 | 결과 |
---|---|---|---|
reduceByKey(func) | 동일 키에 대한 값들을 합친다 | rdd.reduceByKey((x, y) => x + y) | {(1, 2), (3, 10)} |
groupByKey() | 동일 키에 대한 값들을 그룹화한다 | rdd.groupByKey() | {(1, [2]), (3, [4, 6])} |
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) | 다른 결과 타입을 써서 동일 키의 값들을 합친다 | ||
mapValues(func) | 키의 변경 없이 pairRDD의 각 값에 함수를 적용한다 | rdd.mapValues(x => x + 1) | {(1, 3), (3, 5), (3, 7)} |
flatMapValues(func) | pairRDD의 각 값에 대해 반복자를 리턴하는 함수를 적용하고, 리턴 받은 값들에 대해 기존 키를 써서 키/값 쌍을 만든다. 종종 토큰 분리에 쓰임 |
rdd.flatMapValues(x => (x to 5)) | {(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)} |
keys() | RDD가 가진 키들만을 되돌려 준다 | rdd.keys() | {1, 3, 3} |
values() | RDD가 가진 값들만을 되돌려 준다 | rdd.values() | {2, 4, 6} |
sortByKey() | 키로 정렬된 RDD를 되돌려 준다 | rdd.sortByKey() | {(1, 2), (3, 4), (3, 6)} |
함수 이름 | 목적 | 예 | 결과 |
---|---|---|---|
subtractByKey | 다른 쪽 RDD에 있는 키를 써서 RDD의 데이터를 삭제한다 | rdd.subtractByKey(other) | {(1, 2)} |
join | 두 RDD에 대해 inner join을 수행한다 | rdd.join(other) | {(3, (4, 9)), (3, (6, 9))} |
rightOuterJoin | 첫 번째 RDD에 있는 키들을 대상으로 두 RDD간에 조인을 수행한다 | rdd.rightOuterJoin(other) | {(3, (Some(4), 9)) , (3, (Some(6), 9))} |
leftOuterJoin | 다른쪽 RDD에 있는 키들을 대상으로 두 RDD간에 조인을 수행한다 | rdd.leftOuterJoin(other) | {(1, (2, None)), (3, (4, Some(9))), (3, (6, Some(9)))} |
cogroup | 동일 키에 대해 양쪽 RDD를 그룹화 | rdd.cogroup(other) | {(1, ([2], [])), (3, ([4, 6], [9]))} |
- pairRDD도 여전히 일반적인 RDD이므로 RDD에서 지원하는 함수들을 그대로 지원함
pairs.filter{case (key, value) => value.length < 20}
[출처] : https://www.oreilly.com/library/view/learning-spark/9781449359034/ch04.html
데이터세트가 키/값 쌍으로 표현될 때 동일 키에 대해 집계된 통계를 산출하는 작업은 매우 흔한 일이어서, 스파크는 각 키에 대해 값들을 합쳐주는 유사한 종류의 연산을 제공한다. 이러한 연산들은 RDD를 리턴하므로, 액션보다는 트랜스포메이션이라고 할 수 있다.
reduce()
와 매우 유사- 함수를 받아서 값들을 합치는 데 이용
- 여러 개의 병합 연산을 실행하는데, 하나의 연산은 하나의 키에 대한 작업이 되고, 각 작업은 동일 키에 대한 값을 하나로 합침
- 데이터세트의 키가 매우 많을 수도 있으므로, 값 하나를 사용자에게 되돌려주는 액션으로 구현되는 것이 아니라, 각 키와 키에 대해 합쳐진 값으로 구성된 새로운 RDD를 리턴
fold()
와 유사- RDD의 데이터 타입과 동일한 zeroValue와 함께 값이 병합되는 함수를 필요로 함
- 제공되는 zeroValue는 다른 데이터값과 병합 함수에서 합쳐질 때 값의 변경이 일어나면 안됨
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
[출처] : https://www.oreilly.com/library/view/learning-spark/9781449359034/ch04.html
- 그림은 평균을 구하기 직전의 상태 (값의 총합, 값의 개수) 형태
- 평균을 구하려면, 총 합을 개수로 나누면 평균값을 구할 수 있음
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
- 키/값 집합 연산 함수 중 가장 일반적으로 쓰임
- 대부분의 다른 키 별 컴파이너들은 이를 기반으로 구현됨
- 입력 데이터와 동일한 타입의 값을 되돌려 줄 필요는 없음
- 처리하는 각 데이터를 어떤 식으로 다루는지 생각해야 함
- 한 파티션 내의 데이터들을 하나씩 처리하게 되며, 각 데이터는 이전에 나온 적이 없는 키를 가지고 있을 수도 있고, 이전 데이터와 같은 키를 가질 수도 있음
- 새로운 데이터가 들어왔을 때 combineByKey() 의 데이터 처리 방식
- 넘겨준
createCombiner()
를 사용하여 해당 키에 대한 accumulator (스파크에서는 계속 관리가 될 필요가 있는 값을 의미) 의 초기값을 만든다 - 해당 작업은 RDD 전체 기준으로 첫 키가 나올 때가 아닌 각 파티션에서 처음 나오는 키마다 실행
- 넘겨준
- 파티션을 처리하는 도중 출현한 적이 있는 값
mergeValue()
함수를 해당 키에 대한 accumulator 현재 값과 새로운 값에 적용해서 합침
- 각 파티션에서 독립적으로 작업이 이루어지므로 동일 키에 대해 여러 개의 accumulator를 가질 수도 있음
- 각 파티션으로부터 결과를 최종적으로 합칠 때 둘 이상의 파티션이 동일 키에 대한 accumulator를 갖고 있다면 이 accumulator들은 사용자가 제공한 함수인
mergeCombiners()
를 써서 합쳐짐
val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().map(println(_)))
[출처] : https://www.oreilly.com/library/view/learning-spark/9781449359034/ch04.html
- 키에 의해 데이터를 병합하는 것에는 많은 방법이 있다
- 대부분은
combineByKey()
기반으로 구현하지만 더욱 간단한 인터페이스도 제공하며, 어떤 방법을 선택하든 스파크에서 제공하는 특화된 집합 연산 함수들을 사용하는 것이 데이터를 그룹화하고 병합 처리하는 원초적인 접근 방식보다는 훨씬 빠르다
- 홀든 카로, 앤디 콘빈스키, 패트릭 웬델, 마테이 자하리아, 『러닝 스파크』, 제이펍(2015), p.59~ p.57