Download Curso Spark 01 Núcleo de Conceptos
Document related concepts
no text concepts found
Transcript
Curso Spark 01 Núcleo de Conceptos Resilient Distributed Dataset • Resilient Distributed Dataset (RDD) es una colección de objetos distribuida de forma inmutable. • Cada RDD es dividido en múltiples particiones , las cuales serán procesadas en diferentes nodos dentro del cluster. • Se puede crear RDDs de dos maneras, cargando un archivo de datos externo o mediante la distribución de una colección de objetos (ie. list o set) en el programa controlador. Resilient Distributed Dataset • ¿Cómo crearlo? • Colección de objetos existente Python lineas = sc.parellelize([“pandas”, “ositos”]) SCALA lineas = sc.parellelize(List(“pandas”, “ositos”)) Resilient Distributed Dataset • Archivo externo Python lineas = sc.parellelize(“/path/to/README.md”) SCALA lineas = sc.parellelize(“/path/to/README.md”) Operaciones con RDD • Se pueden realizar dos tipos de operaciones en SPARK: • • Transformaciones Acciones. • Las transformaciones son operaciones con RDD que dan como resultado un nuevo RDD. • Acciones son operaciones que devuelven resultados. Operaciones con RDD • Transformaciones • Dan como resultado un nuevo RDD. • Son ejecutados de forma lazy, es decir, son ejecutadas sólo cuando son utilizadas en una acción. • Muchas transformaciones son element wise, esto es, trabajan sobre un elemento a la vez. Operaciones con RDD • Transformaciones Ejemplos: Python inputRDD = sc.textFile(“log.txt”) errorsRDD = inputRDD.filter(lambda x: “error” in x) warningsRDD = inputRDD.filter(lambda x: “error” in x) badlinesRDD = errorRDD.union(warningsRDD) SCALA inputRDD = sc.textFile(“log.txt”) errorsRDD = inputRDD.filter(line => line.contains(“error”)) Operaciones con RDD • Acciones • Son operaciones que dan como resultado un valor final hacia el programa controlador o generan la instrucción de escritura al sistema externo de almacenamiento. • Las acciones forzan la evaluación de las transformaciones requeridas por el RDD. • Operaciones con RDD Acciones Ejemplo: Python print "Input had " + badLinesRDD.count() + " concerning lines" print "Here are 10 examples:" for line in badLinesRDD.take(10): print line SCALA println("Input had " + badLinesRDD.count() + " concerning lines") println("Here are 10 examples:") badLinesRDD.take(10).foreach(println) Operaciones con RDD • Evaluación Lazy • SPARK no ejecuta las transformaciones hasta que aparece una acción. • Al llamar una transformación sobre un RDD la operación no es ejecutada de forma inmediata. Spark sólo registra los metadatos para indicar la operación solicitada. • La ventaja de utilizar la evaluación lazy es agrupar operaciones. • En SPARK no hay beneficio alguno de crear un mapeo complejo en lugar de encadenar muchas operaciones simples. Operaciones con RDD • Pasar las funciones a SPARK • Python • Hay tres formas de pasar funciones en Python: • • • Expresiones lambda Funciones globales Funciones locales Operaciones con RDD • Funciones globales y locales. • Mediante el comando def se pueden definir funciones globales y locales. • Expresiones lambda. • Python permite la creación de funciones anónimas (funciones que no están sujetas a un nombre) al momento de ejecutarse, utilizando una construcción llamada “lambda”. • Las expresiones lambda se utilizan para funciones que sólo van a ser utilizadas una vez. Operaciones con RDD • Filtrado utilizando funciones lambda o funciones globales word = rdd.filter(lambda s: "error" in s) def containsError(s): return "error" in s word = rdd.filter(containsError) Operaciones con RDD • En SCALA se puede pasar funciones a través de los siguientes tres métodos: • • • Inline Referencias a métodos Funciones estáticas. Transformaciones • Las dos transformaciones más comunes son: • • map filter InputRDD {1,2,3,4} map x => x*x Mapped RDD {1,4,9,16} filter x => x != 1 Filtered RDD {2,3,4} Transformaciones • Python Valores al cuadrado en un RDD nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() for num in squared: print "%i " % (num) • Scala Valores al cuadrado en un RDD val input = sc.parallelize(List(1, 2, 3, 4)) val result = input.map(x => x * x) println(result.collect().mkString(",")) Transformaciones • flatmap • Produce elementos múltiples a partir de cada elemento introducido. • El ejemplo más común, llevar un párrafo a palabras. flatMap() en Python, separando líneas en múltiples palabras lines = sc.parallelize([“hola mundo ", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # resulta en "hola" flatMap() en Scala, separando líneas en múltiples palabras val lines = sc.parallelize(List(“hola mundo", "hi")) val words = lines.flatMap(line => line.split(" ")) words.first() // resulta "hola" Transformaciones • Diferencia entre map y flatmap rdd1.map(tokenize) Mapped RDD {[“hola”, “México” ], [”hola”, “DF” ], [”hola”, “comunidad”, “global” ]} RDD1 {“hola México”, ”hola DF”, ”hola comunidad global”} rdd1.flatMap(tokenize) Mapped RDD {[“hola” , ” México” , ”hola” , “DF” , ”hola” , “comunidad”, “global”] } Transformaciones • Operaciones Pseudo Conjunto (Pseudo Set Operations). RDD1 {beisból, beisból, futból, maratón, tenis} RDD2 {beisból, futból, basquetból} RDD1 .distinct() {beisból, futból, maratón, tenis} RDD1 .union(RDD2) {beisból, beisból, beisból, futból, futból, maratón, tenis, basquetból} RDD1 .intersection(RDD2) {beisból, futból} RDD1 .subtract(RDD2) {maratón, tenis} Transformaciones • Operaciones Pseudo Conjunto (Pseudo Set Operations). RDD1 {uno, dos, tres} RDD1.cartesian(RDD2) RDD2 {beisból, futból, basquetból} RDD1 .cartesian(RDD2) {(uno,beisból), (uno, futból), (uno, basquetból), (uno,beisból), (dos, futból), (dos, basquetból), (tres,beisból), (tres, futból), (tres, basquetból)} Acciones • El tipo de acción más común es reduce. Ejemplo reduce() in Python sum = rdd.reduce(lambda x, y: x + y) reduce() in Scala val sum = rdd.reduce((x, y) => x + y) • • fold realiza la misma acción que reduce pero permite incorporar un valor inicial igual a 0, lo cual es muy útil para listas vacías. Tanto fold como reduce requieren que el resultado sea del mismo tipo que los elementos del RDD sobre los que se ejecuta la acción. Acciones • • La función aggregate nos libera de la restricción de que el resultado sea del mismo tipo que el RDD sobre el que estamos trabajando. aggregate() en Python sumCount = nums.aggregate((0, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))) return sumCount[0] / float(sumCount[1]) • aggregate() en Scala val result = input.aggregate((0, 0))( (acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) val avg = result._1 / result._2.toDouble Acciones • collect – envía todos los registros a nuestro manejador. • take(n) – envía n elementos del RDD. El orden no es el esperado. • takeOrdered(num)(ordering) – retorna un número de elementos basado en el orden provisto. • top() – si los datos están ordenados un una forma determinada, esta instrucción permite extraer los elemento en la cima del RDD. • takeSample(withReplacement, num, seed) – Esta función nos permite tomar una muestra con o sin reemplazo. Acciones • count() – nos calcula el número de elementos. • countByValue – calcula un mapa con un conteo de cada valor único del RDD. • foreach – Permite ejecutar acciones en cada uno de los elementos en el RDD, pero no devuelve resultado alguno al manejador. Conversión entre RDD • Algunas funciones sólo están disponibles sobre determinadas formas de RDD. • En SCALA, estos métodos no están definidos sobre la clase estándar de RDD, para acceder a ellos nos debemos asegurar de que estamos tratando con la clase especializada correcta. Conversión entre RDD • En SCALA la conversión a RDD con funciones espaciales es manejada automáticamente utilizando conversiones implícitas. • Se debe agregar al programa import org.apache.spark.SparkContext._ para que los conversiones funcionen. • Estos implícitos convierten un RDD varias clases de wrapper, tales como DoubleRDDFunctions (para datos numéricos) y PairRDDFunctions (para pares key/value) para poder utilizar funciones tales como mean() y variance(). Conversión entre RDD • El API de Python está estructurado de manera distinta a SCALA. En Python todas las funciones son implementadas enbase a la clase del RDD pero fallarán en la ejecución si el tipo de datos en el RDD es incorrecto. Persistencia (Caching) • Los RDD de SPARK son evaluados de forma lazy. • En algunos casos deseamos hacer operaciones sobre un mismo RDD en múltiples ocasiones. • Si las operaciones se hacen por separado, SPARK las ejecutará cada vez que se invoque la acción sobre el RDD. • Esto puede ser muy costoso para algoritmos iterativos. Persistencia (Caching) • Para evitar el cómputo sobre un RDD en múltiples ocasiones, se le solicita a SPARK un persist de los datos. • Cuando se le solicita a SPARK un persist del RDD, los nodos que tienen el RDD almacenan las particiones. • Si un nodo que debe persistir los datos falla, SPARK recalculará las particiones perdidas cuando sea necesario. • También podemos replicar los datos en múltiples nodos si deseamos administrar la falla de nodos sin hacer más lento el servicio. Persistencia (Caching) • Spark tiene muchos niveles de persistencia para elegir dependiendo de nuestros objetivos. • En la siguiente tabla se muestran los nivles de persistencia de org.apache.spark.StorageLevel y pyspark.StorageLevel. • Si se desea se puede replicar los datos en dos máquinas al agregar _2 al final del nivel de almacenamiento. Persistencia (Caching) Nivel Espacio Tiempo En utilizado CPU memoria En disco MEMORY_ONLY Alto Bajo Si No MEMORY_ONLY_SER Bajo Alto Si No MEMORY_AND_DISK Alto Medio Algo Algo MEMORY_AND_DISK_SER Bajo Alto Algo Algo DISK_ONLY Bajo Alto No Si Comentarios Se vierte en disco si son demasiados datos para ser almacenados en memoria Se vierte en disco si son demasiados datos para ser almacenados en memoria. Guarda una representación serializada en memoria. Persistencia (Caching) • Ejemplo de persist con SCALA val result = input.map(x => x * x) result.persist(StorageLevel.DISK_ONLY) println(result.count()) println(result.collect().mkString(",")) Bibliografía • Holden Karau, Andy Konwinski, Patrick Wendell, y Matei Zaharia. “Learning SPARK” OREILLY 2015. • Core Spark functionality, http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark. package