Friday, June 05, 2015

Spark data processing/analytics platform


At work we are looking to leverage a large scale data processing/analytics platform called Spark.   Before doing some hands-on work, I will always do some research to help me get started and have better insights and context on a large scale platform.   So this post summarises these notes on one of the hottest techno on Big-Data which officially past the-peak-of-inflated-expectation!  

Spark is a Data analytic/processing Platform optimised for iterative algorithm and interactive data mining and batched analytics.    

Spark provides a unified platform leveraging MapReduce (MR) parallel data processing capabilities while avoiding some limitations associated to the ancestor open source platform Hadoop.   Spark came out of functional programming model like MR, however it is also a broader generalisation of that model as more than just MR-type tasks are supported.

Underneath Spark's platform architecture lies what is called Spark execution engine.  On top of that you have extra modules like Shark for SQL capability, Spark streaming, MLib for machine learning, GraphX for Graph-based computation, and language-specific libraries (3 supported: scala, java, and python).   See here for an example of a platform integration stack of one vendor.

Also a key component is its Data storage abtraction model called Resilient Distributed Data (RDD) which was designed for coarsed-grained data transformations operating over the majority of data elements.  Consequently it is not suited to operate asynchronously over fine-grained share data, typical of RDBMS.  



Core Primitives

RDD: 

RDD can be qualified as the cornerstone of Spark data management.   In essence, these are data abstraction for immutable and fault-tolerant collections of elements built explicitly for parallelism done on a cluster.   Immutability provides support to have a more efficient fault-tolerant system critical in distributed environment.

Data is processed by chaining succession of RDD's transformation each performing bulk writes on a new RDD, each of which can be easily recovered by reapplying the same transformation definition in case of node failure or straggle (slow node).   These lead to an important property called lineage obtained directly from the resulting directed acyclic graph (DAG) of all transformation definition.  

From this DAG lineage property, it follows that any RDD could be easily re-created after a node failure by regenerating source RDDs.

Simple DAG showing RDD lineage


RDD persistence is controllable, so it is up to the user to decide which steps should be cached when we may need future re-use and/or better performance.

Cached RDD are stored as Java objects for fast access within JVM, but can also be stored on disk in serialised form (could have a smaller footprint than the Java object) in case where memory in nodes becomes scarce.

Key-Value RDD or simply pair RDD, are specialized RDD to support KV pairs, where each element of the RDD is a pair tuple (k,v), hence referred to as "pair RDD".


RDD Persistence

Every RDD can further be persisted in different way, offering a sort of data hook for re-use in other context.   These are very convenient when doing ad-hoc data analysis or machine learning where same state of a dataset can be cached and re-used in various ways.   This contrasts with the more batch-oriented synchronous operations typical of Hadoop.

Storage is characterised by:
  1. The Storage levels (memory-only, memory and disk)
  2. Explicit API calls (persist(), cache())
When deciding to persist or cache, one must consider the gain obtained compared to a full re-computation and the total memory size needed vs available in nodes.


So in a nutshell, RDD has these properties :

  • immutable once constructed
  • track lineage information to be recomputed in case of lost data or node failure
  • enable operations on collection of elements in parallel

and can be instantiated:

  • by paralellizing existing collection (ex. after generating a list in Python, Spark will split it into many partitions and distribute these among the nodes) 
  • by transforming any existing RDDs (sourcing)
  • from input files (in various storage system and format) 


and also user controls behaviour through two properties:

  1. its distribution property (number of nodes to be partitioned into) 
  2. its persistence (whether or not data should be cache for faster downstream consumption). 



Operation:  


The operations defined on RDD are either Transformation or Action.

Transformation consist of some operations (map, filter, distinct, union, grouping) happening on a given RDD and leading up to the generation of a new RDD node (from-to node).   All transformations are lazy operations that get executed only when necessary (i.e. when an action is triggered or a reduce operation is necessary).

Action consist of applying operations over all elements of one RDD (associative function like reduce or others like count, first, take sample, forEeach, ...) and producing some value output.

conceptual role of Transformations and Actions 




Data source 

Spark can interact with various input format frequently seen in high-end analytics platform:
  1. Local files (file://...), directory or SequenceFile
  2. Hadoop Distributed FileSystem (hdfs://...)
  3. HBase 
  4. S3

Closure:

These are the function literal defined in Transformation or Action.  These closures are represented as Java object which are pushed to all Workers by Spark (after serialization/deserialization).  There will be one Closure per Worker and these are sent one way: driver --> worker.

Same method is used to push any global variable as well, although this is not ideal to share information/state among Workers and back to driver.  To share state, there are better approach like special variables called Accumulators and BroadcastVariable.

Shared Variables:

Another abstraction is the concept of Shared Variable that are used for parallel operations.  These are used to share state information (through variable) between nodes themselves and/or between nodes and the driver.

Broadcast variable concept
These variables are designed to help optimise sharing global resource throughout all nodes.   They are read-only variables cached at each node avoiding the need to ship copies during tasks.   Spark will efficiently distribute these as needed with the objective to reduce global communication cost.

This can be used, for example, when we want to score a predictive model applied in parallel to a very large distributed dataset.  This model should then be broadcast to all involved nodes prior to scoring.

Accumulators
These can only be added through an associative operations (to define ..).   They are only readable by the driver program although workers are changing it.  This is supported out-of-the-box for either numeric type or standard mutable collections.   To support other types one need to develop an extension.

These are useful when we need global counters and sums done in parallel.


Spark Program lifecycle:


Spark's code normally follows a series of steps: the data source definition (input RDD), some transformations applied on each RDD's and finally the Actions used to retrieve the needed information:

  1. Get a SparkContext which is the entry point to all functionalities (this is implicit in interactive Shell env with the "sc" variable)
  2. Creation of source RDDs 
    • generating a dataset manually (sc.parallelize([collections...]))
    • sourcing from file (file://..), or HDFS (hdfs://...)
    • sourcing from Hadoop Input (sc.hadoopFile()) 
  3.  Creation of a series of Transformation happening sequentially from source
    • map(), flatMap(), filter(), reduceByKey(), groupByKey(), join()
  4.  Creation of Actions to return some values to the Driver
    • collect(), take(), count(), reduce(), saveAsText()

All these steps are lazily defined and only Action will trigger any processing.   Spark will go over these steps for each new job definition :
  1. Creation of the DAG with all RDD's lineage definition 
    • a direct mapping exist between the DAG and the user code
  2. Creation of a logical Execution plan optimised for the DAG 
    • Spark will aggressively pipeline operations, when these can be generated independently of other steps (ex. a read and map task can be fused together) 
    • Spark will also split tasks into stages to better exploit pipelining and caching
  3. Schedule and Execution of individual tasks according to this logical Execution plan (see next, runtime engine model).

Runtime Engine model

Spark applications involve the execution of independent sets of processes over a cluster.  This requires quite a bit of coordination which is handled by what s called the SparkContext (SC) object.  This object is running inside the Driver program, the main entry point where any Spark application is launched.  The SC can connect to many types of cluster manager (ex. Spark own standalone, Meso, YARN, ..) which handle resource allocation over the cluster.   

After connecting, Spark will acquire Executor on each Working node.   These are run in a single JVM instance located on cluster node and spawned to serve Spark application.  Executor's role is to run computation and to store data done via Spark Tasks  (smallest unit of work) executing in different thread inside the JVM.   Executor remain active as long as the Spark application is running, even when no jobs are running.   This feature allows Spark to start up Task very quickly and process in-memory data at speed not possible with Hadoop.    

Once Executor are spawned inside each Working node, Spark will send the application code to these executors.  This application code can be JAR files or yet python files depending on how the Application was coded inside the SC.  Then finally SC can send tasks for each executors to run. 
Source: "http://stanford.edu/~rezab/sparkclass/slides/itas_workshop.pdf"

Important properties of this model : No communication ever happen betweens Workers!


Task scheduling

Job scheduler's role is to run user-defined job as a sequence of stages that takes into account partition having persistent RDD in memory, possibility to pipeline transformations (individual task) with narrow dependency, separating transformation with wide dependencies (require shuffling).   Scheduler also consider data locality and preferred location.   If a task needs to process a partition available on a specific node, the scheduler will send it to that node, or if a task processes a partition for which the containing RDD provides preferred location (typical of HDFS file), again the scheduler will send it to those.

For example, this "simple" job definition:

rdd1.map(splitlines).filter("ERROR")
rdd2.map(splitlines).groupBy(key)
rdd2.join(rdd1, key).take(10)

Will yield the following DAG flow:
Source: databricks (https://databricks.com/blog/2014/03/20/apache-spark-a-delight-for-developers.html)

After examination of this DAG, the Scheduler defines stages to execute in a way to combine or pipeline all narrow transformations.  Here, RDD A-B would be stage-1 and RDD C-D-E stage-2 and the final join would be stage-3.  


In summary, Scheduling has these roles and responsibilities:
  • running all task in graph (the "full" DAG in proper sequence)
  • pipelining functions when possible (ex.  a map() and filter() transformations could be pipelined together and done in one stage, as a way to avoid processing later discarded data elements)
  • reusing cache-aware data and exploiting data locality
  • being partition-aware to reduce shuffle


Main Differentiation points:


The main selling points are actually Hadoop's biggest drawbacks that lead to the creation of many extension and add-on built around Hadoop ecosystem.   Hadoop biggest complain relate to its inefficiency toward iterative computing and interactive querying (due to low latency dispatcher), its Disk-only based approach, and its complex and verbose coding (complex algo only possible through chaining many individual MR jobs).  

By contrast, Spark offers:
  • Easy to develop environment
    • A rich API using Scala, Python or Java 
    • Leverage a real functional language namely Scala which makes it more aligned to manipulate MR (MapReduce) programming model (they figure it would minimise impedance with this programming model not directly accessible with pre-1.8 Java).
    • Scala and Python being interpreted languages, this allows Spark to have its own interactive shell which is so useful in interactive analysis (pyspark and spark_shell). I'd say this time-saver has become the norm now. 
  •  Fast to run platform
    • The general execution graph (DAG) concept is key in providing speed and lazy transformation
    • In-memory (cache) and persistent storage is really allowing the platform to move away from batch-oriented paradigm to more realtime and iterative workflow.

Versatile deployment configuration with many possible installation options.  For example, the Cluster Manager type can be deployed on these scheme :
  1. Standalone local (to run simple demo/prototyping)
  2. Standalone Cluster-based  (ex. running on same nodes as your Hadoop HDFS-based cluster) 
  3. Yarn-based 
  4. Mesos-based  


Martin

1 comment:

Unknown said...

I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in Apache spark mlib, kindly contact us http://www.maxmunus.com/contact
MaxMunus Offer World Class Virtual Instructor led training on Apache spark mlib. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.

For Free Demo Contact us:
Name : Arunkumar U
Email : arun@maxmunus.com
Skype id: training_maxmunus
Contact No.-+91-9738507310
Company Website –http://www.maxmunus.com