ud712 ยป



We saw the system's issues at a macro level from the point of view of administering the engine that powers internet scale computing. How do we program services such as websearch and airline reservation on top of the data center cluster resources? How do they exploit the hardware parallelism that's available in these clusters? We will learn about the map produced programming paradigm that answers these questions. The term Big Data, has become a buzz word. Computations in giant scale services are usually simple, but they work over large data sets and hence, the name Big Data. And naturally, because they are working with a large data set, these computations take a long time to compute. For example let's say that I want to search for John F Kennedy's photographs in all the documents that are available on the web. It's going to take a long time. And that is an example of a Big Data computation. Similar examples include online reservations, shopping, et cetera. And applications that work on Big Data would like to exploit the pluralism that's available in the cluster at the data centers. I mentioned in the last lesson that data centers these days comprise of computation elements on the order of thousands and even 10,000 nodes that are ready to do computations. And we would like to exploit the computational resources available in such Big Data centers to do computation on Big Data. For example, if it is John F Kennedy's photographs, I'm looking in all the different documents. We can parallelize that's search for Kennedy's photo in all the documents in parallel on all the available nodes in the data center. And since computations are also called embarrassingly parallel computations. What does this mean? Well, there is not much synchronization or coordination that is needed among the parallel threads that comprise the applications and that run on different nodes of the computational cluster. Looking for John F Kennedy's photographs in all the documents on the web, is a good example of such an embarrassingly parallel application. What are all the issues in programming in the large, on large data sets, so the data sets are also large and the computational resources on which we want to run this big data application is also vast. So some of the interesting issues in programming in the large include, how to parallelize an application across let's say, 10,000 machines. How to handle the data distribution and plumbing between the producers of data and consumers of data. Remember these are embarrassingly parallel applications, but different phases of the application will require data from one phase of the application to be passed on to the next phase of the application. That's what we mean by plumbing between the producers of data, intermediate data to be more specific, and consumers of that intermediate data. And of course, one of the biggest challenges in Big Data applications on large computational clusters is failure handling. Recall what we said about the nature of these data centers. They employ thousands and thousands of processes. When you have so many parts, in any setting, failure is not a question of if it will happen. It is a question of, when it is going to happen? So that's a fact of life, and therefore, programming models for Big Data, have to worry about the fact that failures are to be expected in this environment.


In this lesson we're going to look at one specific example of a programming environment for dealing with big data applications running on large computation clusters. And this programming environment is called map-reduce programming environment. And in this programming environment, the input to the application is considered as a set of records Identified by a key value pair. So the big data app developer, supplies the run time environment with two functions called map and reduce. And these are the only two functions that the app developer has to supply to the programming environment and both map and reduce - Take user defined key value pairs, as inputs, and produce user defined key value pairs as outputs. So both the input and the output to each of the map and reduce functions, written by, the domain expert, are key value pairs. Once again, these are key value pairs defined by the app developer. Specific to the particular application that he or she is coding. Let's consider a fun example. In this example, I'm going to say that we're looking to find specific names of individuals in a corpus of documents. Maybe the names are Kishore, Arun, Drew, and so on. These are the specific names that I want to find in a whole corpus of documents. So the input to the application is a whole corpus of documents, and we are looking for specific names and the number of times those names occur in those documents. So that's what the appllication is going to do. So the input key space for the application is the file name. And the value part of the key value bares the content of the file. So if you have n files in the corpus of documents that we want to analyze in this fashion, then you have N key value pairs corresponding to each of the different files and the respective contents. So this is going to be the input to the whole application and were going to structure this application using this programing paradigm of map reduce. And we will see how the app developer will use the map-reduce framework to accomplish the goal that we set out. Which is to find the number of occurrences of unique names, in this corpus of documents. In this example, the user defined map function is looking for each of the unique names that we're interested in the corpus of documents. So the map function will take as an input a specific file and the contents of the file as the key value pair, and emit a value that corresponds to the number of times each one of these names occur in the input file. Now, a simple version of the map function may emit a one every time it encounters the name Kishore in the input file, or the name Arun in the input, or the name Drew in the input file. The output of the map function is a key value pair, where the key is the unique name that you're looking for. One of the unique names that you're looking for, and the value is the number that says how many times it found that particular name. As I said earlier, a simple version of the map function could emit the value of names that it is looking for in the file. Or a slightly more elaborate mapper may actually combine the number of occurrences of a particular name in the input file and then [UNKNOWN] that as a sum of all the times a particular name occurred in this key value pair. In either case, the output of the map function is itself a key value pair. Note that it is different from the input key value pair. The output key value pair that the mapper is emitting is a unique name and a value that is associated with that unique name and from this example it should also be evident that we can spawn as many instances of the map function as the number of unique files that we have in the input document corpus. Further, since each of the map function is working on a different key-value pair as an input, they're all independent of one another and there is no need for coordination among them. And that's the feature of an embarrassingly parallel application. The output of the mappers are the input to the reducers. In other words, the output key value pair from the mapper is the same as the input key value pair for each of the reducers that you see here. Again this reduce function is something that is written by the user. And you notice that what the mapper is doing is when it notates a particular name in this example that it is looking for like Kishore then this mapper is going to send the value associated with Kishore to this reducer. Similarly this mapper is going to send the value associated corresponding to Kishore to the same reducer. And all the mappers in the same fashion are going to send the respective values that they phone for this unique name Kishore, to this reducer. Similarly, the values phone for the name Arun, is going to be sent to this video set by all the mappers, and so on. So the number of reducers that we will have is the same as the number of unique names that we are looking for in the corpus of documents. This is where work needs to be done by the programming environment to plumb the output of the mappers To the inputs of the reducers. Potentially, the mappers could be executing on different nodes of the cluster, reducers can be executing on different nodes of the cluster, and the work that is involved in this plumbing is really making sure that the outputs of the mappers are fed to the corresponding correct reducers in the entire distributed system. For example, the output that is coming out of the mapper corresponding to Kishore has to be sent to this reducer from all the mappers. The output corresponding to Arun coming from all these mappers has to be sent to this reducer, and so on. That's what we mean by the plumbing that needs to be done by the programming environment to facilitate the communication That needs to happen between instances of the mapper and the instances of the reducers. The work performed by each one of these reducers is going to be aggregation of the values that it got for each of the unique names that we're looking for in the input document corpus. So this reducer's job is to aggregate all the instances of Kishore that it got from all the mappers. So it is receiving the key-value pair corresponding to the name Kishore and the number of instances that they found in the input key-value pair that they processed, and the reducer is going to aggregate that And the output of the reducers, all the reducers, is once again going to be a key value pair. It is exactly the same as the input key value pair that the reducers got. Namely, the key is the unique name that they've been assigned to aggregate, and the value is the total number of the occurrence of this unique name. In the corpus of documents we started with. So this is the roadmap for creating a map-reduce application for a chosen application that works on big data. So in this case, the chosen application was looking for unique names in a corpus of documents, and all that the domain expert had to worry about, is deciding what is the format of the key value pair, which is the input to the whole application, in particular, the map phase of the application. And also, what is the format of the intermediate key value pair that is going to be generated by the mapper function. And what is the work that needs to be done in each of the mapper and the reducer to carry out the work that needs to be done for this particular application. Beyond that, the app developer does not have to worry about any of the other details, such as how many instances of mappers do we need to create. How do we create the number of instances of reducers corresponding to what this application is wanting to do, and also worrying about the plumbing from the output of the mappers to the input of the reducers, all of those, are things that the app developer does not have to worry about.

Why MapReduce

The question is why map reduce is a programming framework for big data applications. It turns out that several processing steps in giant-scale services are expressible as map reduce. For example, let's say that you're looking for seat availability for selected dates to selected destinations on different airlines. That can be expressed as a map reduce computation. Let's say you want to access frequency of the URLs on the website that you've designed. That can be quoted up as a map reduce application. Let's say you want to create. Word indexes to facilitate document searches on the web. That can be coded up as a map reduce application. Or let's say that you want to do ranking of pages. When a user is doing a search, how to present present the search results May depend on the popularity of pages and that is what is often referred to as page ranking. So if page ranking has to be done for all the documents that is another application that can be [UNKNOWN] up as a map-reduce application. The list goes on. All such examples that are mentioned share some common property. They're embarrassingly parallel, and they're common in giant-scale services. And all of them tend to work on big data sets. Therefore there is plenty of opportunity for taking advantage of the computation resources that are available in a data center. And all such applications need domain expertise in terms of what to do with the data. Which is expressible as a map function and a reduce function, and this is the only thing that is required of the domain expert to provide to the programming system, because that is domain expertise that lives with the app developer. So here is another example of how an application may be structured as a map_reduce application. And in this case, I'm showing you a page ranking application that is, I'm interested in knowing what is the popularity of different URLs that occur in a document corpus. So the keyspace that is input to this application is a set of URLs. And the key value pair that is coming into each of the mapper is a source URL and the contents of that web page that corresponds to this particular URL. So what this mapper is doing is, in the given page defined by this URL the contents of which is the input to this mapper it is looking for different targets. Maybe it is looking for a particular URL target one, another URL target two, and so on, target N, and that's what each of these mappers are doing. So the keyspace that is output from the mapper is unique target names. So the keyspace that is output from the mapper is unique, target URLs and, the value is the URL in which it was actually phoned. So the corpus of input URLs it is taking, and saying well, in this particular URL I phoned this target. If it did. It is emitting that this target was found in a particular URL. And this reducer is going to get all the hits for a particular target that was found in the input corpus of URLs. So all the mappers they're going to send their results to this reducer if they found in the input - Surl the target, target 1, if they did, they are going to send their results to this reducer. Similarly if they found target n, each of these mappers are going to send this reducer that in the input URL they found this target n and the job of the reducer [INAUDIBLE] Is once again aggregation, and you have as many reducers as the number of unique targets you're trying to identify in the input dataset. Very similar to the previous application that we went over. So the output of the reducer is going to be the specific target that this guy has been asked to accumulate. And, a source list, meaning all the source pages in which this particular target was found. So each of these reduces is going to find the number of times a particular URL is found in the input corpus of webpages that came into the system as a whole. For instance, if I wanted to find out how many times my webpage appears in the universal web pages all over the world, we can take the entire corpus of web page available in the universals input, and the map that's we're going to look for occurence of my web page in each one of those input web pages. And if they find that, they're going to send it to this reducer and if target one corresponds to my web page then this reducer is going to say, okay, show his webpage, was found in this list of source webpages all over the Universe. That in a sense gives a rank for. That particular web page that we're looking at. So we're able to rank the target web pages 1 through n based on the number of source web pages that contain that particular target. And that's what page ranking is all about. So I'm giving you yet another example of how This map reduce functionality can be applied for an application such as page ranking. All the heavy lifting that needs to be done, in terms of instantiating the number of mappers, instantiating the number of reducers, The data movement between the mappers and the reducers, all of those chores, are taken care of by the programming environment. All that the domain expert had to do was to write the map and reduce function that is unique to particular specifics of his application. The rest of the heavy lifting is all done by the programming framework.

Heavy Lifting Done by the Runtime

Now let's look at all the heavy lifting that needs to be done by runtime in order to facilitate this map-produced style of programming. The app developer writes his map function and reduce function, and instantiates the programming library by calling map produce. And the programming library splits the input files that is provided by the user, that is a key value space provided by the user, into M splits. The number of splits of the input key value pairs,namely M, can be automatic by the programming system. Or M can also be specified by the user. In any case, once M is specified or automatically determined by the programming framework, it splits the input key value space into M splits. The next thing that happens in the programming environment is to spawn the master for this particular map reduced computation, and all the worker threads That are going to carry out the work involved in map functions and reduce functions. The master is special because the master is the one that is sort of overseeing the whole operation and keeping tab on which workers are doing what functions, when are they done,when to start new work, when to say that the computation is done. All of those chores are being orchestrated by this master. The next thing that happens is that the master is going to assign some number of worker threads as the mapper worker threads and the number of mapper working threads may correspond to the number of splits that it has done in the beginning. So there are M splits, then M worker threads are going to be assigned to mapping function. So each worker thread is going to take one portion of the input file split that have been done, and apply the map function on that particular input split. The next thing that happens is that the master assigns reduce tasks to some number of worker threads and the number of reducers to be assigned to the workers, R, is something that is decided by the application. Recall in the example that I gave you about looking for specific names. In an input corpus, the number of unique names is something that the app developer is specifying. That's where the number R comes from. That's the number of splits that the user is specifying, and that parameter is going to be used by the master to assign some number of workers as reducers. The next thing that the master does. Is to plumb the mapper to the reducers. Because now when the mappers produce that output. That output has to be sent over to the consumers of the intermediate results of the mapper, namely the reducers. And setting up this communication path between the producers of data The mappers and the consumers of data that it uses is the plumbing that the master does as the next thing. Now it's time to get to work. The map phase, what it is going to do is, it is going to read its respective split. So each of these workers is assigned to the mapping function. So each of the worker is working on a particular split. And what they're going to do is read from the local disc, the split that they've been assigned, parse the input, and then call the user defined map function. The user defined map function is the one that is doing the core of the work that needs to be done on the input data set to produce the intermediate output. The rest of it are things that needs to be done in order to facilitate the work to be carried out by the domain expert in the map function. And the intermediate key value pairs that we produced by the mapper will be buffered in memory, so each one of these workers Is doing a portion of processing the input key value place and producing the respective outputs. And periodically the intermediate results are going to be written to files, which are on the local disks of the worker or the respective workers. For this guy, on its local disk, its going to write intermediate files corresponding to the output of the map function. Similarly this worker is going to write to its local disk, the intermediate files and so on. And because the application developer has specified that there are R splits in the reducers. Each worker, meaning each map function that is associated with that worker, is going to produce R intermediate file. One for each of the R workers that are going to be doing the reduce operation. And all these intermediate files are stored on the local disk associated with each of these computational nodes carrying out the worker function corresponding to the map phase. And when they are done with the map operation for the split that they are handling, the worker will inform the master that i'm done. And what the master's waiting on is waiting on all of these mappers to get done before letting the workers to get going on the input data set. So in this sense, the master is like the conductor of an orchestra. He is started this mapping function waiting for all of these guys to get done, and when they indicate that they have done the work by notifying the master. And all of the M mappers that have been assinged to these workers have completed that work, then the masters say, okay now it is time to move on to the reduced phase.

Heavy Lifting Done by the Runtime (cont)

In the reduce phase, there is more work to be done in pulling the data that is needed for each one of these reducers. Because, we know that the mappers have been executing on different nodes of the computational cluster, and they produce their intermediate results as files on their local disk. And this worker that is carrying out a particular split of the reduce operation has to reach out and pull the data from all of the m mappers that have stored their intermediate results on their respective local disks. So there is remote read that is involved. As the first thing in the reduce phase is to pull the data. This is part of what I mean by the plumbing that the runtime system provides is to recognize that, for this reduce operation to work, it needs the mapping results from all the m nodes that carried out the map function. And so it is going to do RPC in order to get all this data from all the local disks of the nodes on which the map was executed. And once it has all the data, it can sort it, and then call the reduce function. The reduce function is the one that has been written by the domain expert. And this is the point at which the domain expertise comes in, in saying, well, I've got the data now, let me do the processing that I want to do for the reduce operation. The sorting that is being done as part of the programming framework may be to sort the input that is coming in from all of these different mappers, so that all the same keys are together in the input data set that is going to be given to the reduce function. And once such sorting has been done, the programming framework will call the user-supplied reduce function for each key with the set of intermediate values so that the reduce function can do its thing, which is domain specific. Each reduce function will then write to the final output file specific to the partition that it is dealing with. So, if you think about the original example that we started with, if, let's say, this guy is accumulating all the instances of the name Kishore, then it will write the output file that says oh, I've found so many instances of the name Kishore in the input corpus of data. And similarly, this guy may be doing it for another name like Drew, or Arun, and so on. And once each worker has completed its work by writing its final output file for this partition that it is responsible for, then it informs the master, that, yes, I'm done with the work that was assigned to me. The user program can be woken up when all the reducers have indicated to the master that they have done the work, and at that point the map reduce computation that was initiated by the user program is complete. We said that there could be m splits of the input dataset, meaning the input key-value pairs, and there could be R splits of the output that has to be generated by the application as a whole. Now, the computational resources that are available in the data center, N, may be less than the sum m plus R. So there may not be a unique machine to assign for each one of the m splits that have been made. It is the responsibility of the master to manage the machines and assign the machines that are available to handing the m input data sets as well as the R reduce splits that need to be generated. So this is part of the heavy lifting that has to be done by the runtime. So, for instance, let's say that I have only 100 worker nodes available as mappers. And I have an input split of 1000, then what I'm going to do is, I'm going to assign one split to this worker. And when the guy says I'm done with it, then I'd say, oh, you're done? Okay, take the next split and work on it. Take the next split and work on it. So that's how we're going to manage the available resources for carrying out the work that needs to be done. So remember that this is all done as part of the heavy lifting by the runtime, the user doesn't have to worry about it. All that the user did was write the map function and write the reduce function, and the rest of it is magic so far as the map reduce framework is concerned. And similarly, the number of R splits specified by the user may be more than the number of workers that are available to carry that out. And in that case, again, the master is going to assign a particular split to this worker so that he can compute and generate the output file corresponding to that split. Once he's done with that and notifies the master, then the master will say, okay, now that you're done with that, work on the next split. And it'll do all the work that is associated with plumbing, meaning, bringing the data for the split that a particular worker is working on right now, sorting it to put all the corresponding keys together, and then finally calling the reduce function that has been written by the user. That's the kind of work that goes on under the covers, in the programming framework of map reduce.

Issues to be handled by the Runtime

Lots of work to be done by the runtime system, to manage and map_reduce computation. The master data structures include the location of files created by the completed mappers. Remember that each mapper is working on some node of the computational cluster. Producing its output as files on its local disk. So the master has to have knowledge of where those files reside. Created by those mappers, and the namespace of the files that have been created by the mappers. And it also has to keep a score board, of the mappers and reducers that are currently assigned to work on different splits. Because the number of machines that may be available, may be less than the total number of machines that will be needed. If I wanted to do all of the input m splits in parallel and all of the R output splits in parallel. And therefore, the scoreboard is saying, at any point in time, who are all the workers carrying all the mapper functions? Who are all the workers carrying all the reducer functions? When are they done? And when they are done. How should I reassign that worker to a new task. So these are the kind of things that the master data structures facilitate the master to do. Then the big thing is fault tolerance. For a variety of reasons, the master may find, that an instance, of a map function that it started, is not responding in a timely manner. Maybe that node is down for some reason, maybe the link is down for some reason, or maybe that processor is taking a little more time than what the master expects. It can happen very easily, because in a. Data center environment where you have thousands of machines, there could be some machines that are slower than other machines, there could be generational differences between the machines that populate the data center, even though they may be homogenous in terms of the capabilities. Programming alignments and even the machine architecture. They may have speeds that are different because they correspond to different generations of processes. In any event, what might happen is that a master may notice that there is no timely response from a particular instance. Let's say of a mapper. In that case, it might say well I'm going to go ahead assume that that mapper is dead. I'm going to restart that mapping function on a different node of the cluster. Now, when that happens it is possible that the original mapper is actually dead, or it could be that it was just slow. In that case, you could get completion message from more than one mapper for the same split. So when you get multiple completion messages from redundant stragglers, the master has to be able to filter the mordancy. This is something that is redundant work. I don't have to care about that. So that's part of fault tolerance that the master has to worry about. Locality management is another important thing. In the memory hierarchy, making sure that the working set of computations fit in the closest level of the memory hierarchy of a process is very important, and this is another thing that has to be managed very carefully so that the computation can make good forward progress. In completing the map produce function. There is an inherent assumption in the fault tolerance model of the map produce framework. And that is item potency of the mapper. That is, even though the same input data split is being worked on by multiple mappers, It doesn't affect the semantics of the computation as a whole, and that is the reason why the master can simply ignore the redundant stragglers' message if in fact it was a slow computation engine that was working on a particular map function. And if it finishes later than when it was supposed to, and the master has already started another redundant computation to take care of that particular split, ignoring that redundant straggler message. Is okay because of the item potency assumption about the mapping operation. In a similar manner, the master may assign a new node to carry out reduced function corresponding to a particular split if there's no timely response from one of the reduced workers. And here again, the output file generation that is associated with a particular. A reducer split depends on the atomicity of the rename system call that is used to finally say that oh, this is the old profile generated, and the master says okay, I'm going to commit this as the real output file of the computation. Each reducer is writing a local file, and finally when the master gets the notification from the reducer, that it has completed its work, at that point, master renames that local file as the final output file, and this is where can get rid of redundant stranglers who come along later and say I produce the same output file for the same split. Master will say, I already got it, I don't need this, and it can ignore the completion message from that redundant straggler. The other thing that the programming environment has to worry about is locality management, and this is accomplished using the Google file system that provides a way by which, efficiently, data can be. Migrated from the mappers to the reducers. Task granularity is another important issue. As I mentioned, the number of nodes available in the computation cluster may be less than the sum M plus R, where M is the number of input splits, and R is the number of. Reducer splits. And it is the responsibility of the programming framework to come up with the right task granularity so that there can be good load balance of the computational resources. And the programming framework also offers several refinements to the basic model. The way the partitioning of the input data space is done, is by using a hash function that is built into the programming environment. But the user can override that partitioning function with his or her own partitioning function if they think that that'll result in a better way of organizing the data. And in the map reduced framework, there is no interaction between the mappers, but if there's some ordering guarantees needed in terms of ordering the keys and so on, that is something that the user may have to take care of. The other thing that the user may also do is combining. A partial merging to increase the granularity of the tasks that execute a mapping function. Remember that when we looked at the simple example of looking for specific names and input corpus of data, I mentioned that. The mapper can be as simple as emitting a one for a value every time it encounters the name Kishore for instance in an input file. Or, it could take an input file and count all the occurrences of Kishore in that input file and finally emit the total value as the output of the mapper. That kind of combining function is something that. A user may choose to incorporate in writing his mapping and reduce functions. It is very important to recognize that in order for the fault tolerance model of the programming environment to work correctly, map and reduce functions have to be item potent. That's a fundamental assumption. That the fault tolerant monitoring of the programming framework is based on. And there are also other bells and whistles in the programming framework for getting status information and logs and so on and so forth, but the basic idea I want to leave you with is the fact that the programming framework is a very simple one. It has two operations, map and reduce and using those two operations. You can specify a complex application that you want to call up, to run on a data center, using the computation resources that's available in the data center.


The power of Map Produce is it's simplicity. The domain expert has to write just the Map and reduce functions for his application. All the heavy lifting is completely managed by the run time system under the covers.