ud712 ยป



The Internet and the World Wide Web and intimately tied to everyday lives. It has put information that we seek at our finger tips. Remedies for the common cold, dinner recipes, sports updates, developments in the Middle East, you name it. You will find it on the internet. We know that the creation of such content happens from the basement of individuals like you and me, as well as businesses such as CNN, BBC, and NBC who's work it is to create such content. How is the content in the internet stored and distributed? In the first two lessons of this module, we looked at the server end of the giant scale services. How's a computational resources organized in the data center? How is the data organized internally in the computational. Nodes of the cluster. And what is the programming model for dealing with the big data associated with giant scale services and for exploiting all the computational resources that are available in the server forms and data centers. In this lesson, we'll look at content distribution networks. That is How is information organized, located, and distributed when we're looking for them? Once again, the issue is dealing with the scale of the problem. The content is generated worldwide and users are trying to access the content worldwide as well.

Programming Paradigms

We have seen how P Threads serve as an API for for the development of parallel programs. For distributed programs, sockets serves as an API for application development. And conventional distributor programming such as NFS servers and so on use socket API And build RPC packets on top of that. But unfortunately socket API is too low-level and insufficient in semantic richness for emerging multimedia distributed applications. Similarly sockets serve as the API for distributed program development. So conventional distributed programs are built using socket API, for example you man have a server like an NFS server, and clients connect to an NFS server using sockets, in order to do the distributed I/O between the client and the server. But unfortunately, socket API is too low-level, and it doesn't have the semantic richness that is needed for emerging novel multimedia distributed applications.

Novel Multimedia Apps

Novel distributed multimedia applications tend to be sensor-based. And these sensors can be simple sensors, like temperature sensors and humidity sensors, or more interesting and complex sensors, like cameras and microphones and radars and so on. And these sensors are distributed, which means that you have to access them via the internet. And what we want to do with these sensor streams is live-stream analysis, and often, often such applications are also called situation awareness applications. Where what we are doing is we are gathering sensor data in order to analyze in real time what is happening in the environment and take appropriate decisions. So such situation awareness applications exhibit a control loop going from sensing, prioritizing the sense data to figure out what are important. For instance, if there are lots of cameras and there is no action in front of some cameras, some cameras are more interesting than others. That is the prioritization step, of figuring out what sensors are interesting and then devoting more computational resources to analyze the camera streams or other sensor streams and as a result of that, taking some actions. And this might involve actuating other sensors, actuating triggers, actuating other software entities, or even humans, to intervene in terms of what actions were needed to be taken. And part of this control loop may also be feedback to the sensors themselves to re-target them. For example, there may be a camera that you may want to point in a different direction. And there are cameras like pan, tilt, zoom cameras, which you may want to control depending on what you are observing in the environment. And such a control loop characterization of situation awareness applications can be applied to a lot of emerging distributed multimedia sensor-based, applications such as traffic analysis, emergency response, disaster recovery, robotics, acid tracking and so on and all such applications are computationally intensive. And because they are dealing with real live streams coming from the sensors, they have real-time properties, which means that there is a need to shrink the latency from sensing to actuation, so that we can take tiny actions based on the sensed data. Since they're computationally intensive, we need horsepower to run these large-scale novel multimedia distributed sensor-based applications. And so computational engines such as the clusters and the clouds may be deployed in order to cater to the needs of these large scale, sensor-based distributed applications.

Example- Large Scale Situation Awareness

An example of a large-scale situation awareness application would be monitoring, for instance, activities in an airport. And in that situation what we are trying to make sure is that normal activities are okay, but anything anomalous, with respect to what we see in the environment has to result in triggers being sent to the appropriate software agents or even humans, in terms of actions they may need to take. And camera networks are becoming ubiquitous. For instance, in the city of London, there is on the order of 400,000 cameras that are deployed, blanketing the city. And the intent in such camera networks is to analyze the camera streams to, to detect for anomalous situations. When you have such a large number of sensors the first thing that one has to worry about is overloading the infrastructure itself with the amount of data that is being continuously produced 24/7 from all of these sensors. That suggests that it is important to prune these sensor streams at the source in order to make sure that the infrastructure overload is avoided. And the traditional way to do surveillance is to have humans sitting in front of banks of monitors looking for any anomalous activity, for instance, in an airport. This obviously does not scale if you have on the order of 100,000 or 200,000 cameras blanketing a city, which is becoming fairly common in many large cities like New York, Chicago, London, and so on. And so, cognitive overhead is another big thing that you have to worry about in large-scale situation awareness applications. And it is extremely important to avoid false positives and false negatives. With false positives we are identifying events that are not really anomalous, but the system thinks that it is an anomalous event. That's a false positive and similarly, false negative is saying, well, there is an event that should have been caught but, but did not get caught and that's a false negative. So both of these are very, very important metrics that one has to measure in large scale situation awareness applications. From a programming model perspective, one of the important take aways in just the discussion about large-scale awareness application is that the programming infrastructure has to facilitate the domain expert who is developing a large-scale situation awareness application to be able to deal with time sensitive data. So what are the pain points in developing large scale situation awareness applications? The first thing is providing the right level of abstractions, promoting ease of use. Simplicity is the key. Interfaces that allow seamless migration of computation between the edge of the network and the computational resources in a data center, for instance. Temporal ordering of events that are taking place in the distributed system, and propagating temporal causality. By that, what I mean is if an event, lets say, was captured by a camera at a particular point of time, it make be processed at a later point of time in real time but there's got to be a temporal causality for the time at which it was captured and the digest of information that was created. And such temporal causality is extremely important in the development of large-scale situation awareness applications. Another requirement in such large-scale situation awareness application is that there are live data that we are dealing with, but at the same time we may also have to correlate that with historical data in order to make a high-level inference as to what's going on. For instance, if we see a speeding car at this point of time on the highway we may want to ask the question, was this car involved in an incident in the last n days? And that is kind of historical data that has to be correlated with live data. So a programming infrastructure that provides facility for these kinds of things would make the life of the domain expert, writing such large-scale situation awareness applications, so much simpler.

Programming Model for Situation Awareness

Let's look at the computation pipeline for a Video Analytics application. Let's say I'm interested in detecting and tracking some anomalous event. So I have cameras, which captures the images from a particular environment. And you're detecting for something specific. Maybe you're looking for [INAUDIBLE] in every camera frame, and once you detect, it shows face then you say it what? This guy is a suspicious individual, let's track him as he moves around. So, the tracking is taking a particular object of interest for the domain and asking the question What is happening to that object as time moves on. And in the process of tracking, you are recognizing who that person is. If there are multiple people in a particular alignment, your recognizing the specific individual that you may be interested in. And if that individual is recognized, then perhaps you want to raise an alarm. So, this is just a simple pipeline of tasks to illustrate what a domain expert may be doing in a video analytics application. So such a domain expert is an expert in writing detection algorithms, tracking algorithms, and. Recognition algorithms so that they can process the video stream in real time to generate alarms. So the objective in Situation Awareness applications is to process these streams of data for high level inference. In other words, we're not watching a YouTube video, but we're using the sensor streams to derive high level inferences to what is happening in the enlighnment. And that's what is the nature of Situation Awareness applications. As I mentioned Video Analytics is in the purview of a domain expert like a vision researcher or a developer, who knows how to write sophisticated detection, tracking, and recognition algorithms. But how do we scale that up to thousands of cameras? If an object is moving from one camera to another, these are the kinds of things that the domain expert. For Situation Awareness may have to worry about, this is where systems can come in with programming models that alleviate the pain points of a domain expert developing Situation Awareness applications. Persistent temporal streams, which is the focal point of this particular lesson. Is just one exemplar of a distributed programming system for catering to the needs of situation awareness applications. Such as, what I described to you, just now. I want to stress the fact that PTS is just an exemplar, not the last word. But it is good to look at one concrete example of a distributive programming system that can help reduce the pain points in developing Situational Awareness Applications.

PTS Programming Model

The PTS Programming Model is very simple and straightforward. It's a distributed application and in the distributed application, there are threads and channels. These are the two high level abstractions provided in the programming model. And the computation graph that you generate using the PTS programming model, in particular the extractions thread and channel provided with the PTS programming model. The computation graph looks very similar to a UNIX process socket graph. So in terms of a transition for a programmer that is familiar with the socket API. To transition to programming, using the abstractions provided by PTS, is not significant. While the computation graph looks similar to a process socket graph, the semantics of the channel abstraction is very different from the socket abstraction. In particular, what the channel holds are time sequenced data objects. So a thread is generating time sequence data objects can place those time sequence data objects into the channel. The other thing you will also notice from the structure of this computation graph that there could be multiple producers of data that go into a given channel And similarly, there could be multiple consumers of data that are getting data from a given channel. Or, in other words, a channel allows many to many connections. A thread that produces some data and wants to put it into a channel uses the primitive provided for. Operating on the channel, namely the put item primitive and the put item takes two arguments, one is the item itself, which is whatever the application decides as the content of the data that it wants to put in the channel. The time stamp that it can associate with the data that it is producing and placing in the channel. So if you look at the contents of any given channel, it's really showing a temporal evolution of data that is being produced by a particular thread. For example, this thread could be capturing data from a camera and every item that it produces is one frame of image captured from the camera associated with the wall clock time. At which that particular frame was captured. A computation that is needing to use the data will do a get on a channel and when it does a get on a channel it actually has to specify two arguments. One is a lower bound for a time stamp that it is looking for and an upper bound. Or in other words a thread that wishes to get data from this channel, for instance, will specify in the get primitive that I want to get the data that is in a particular channel starting at a particular time. Say 1:05 p.m., To an upper bound, 1:06 p.m. And what the thread is going to get back is all the items that this channel contains between this lower bound and upper bound. The programming model also allows an application to specify time variables in an abstract way. It could say, I want the oldest item from a channel, or the newest item from the channel. In addition to specifying explicit lower bound and upper bound markers if it so chooses. So in other words, a channel contains a continuous stream of data that have been snapshotted with different time stamps associated with them. So, this is the way one could write a program that uses the primitives provided by the PTS programming model. Let's say we are writing the code for a detector that is looking at video stream produced by a particular camera. In that case, the detector will associate a variable channel one with the video stream that corresponds to what that camera's producing. And continuously what it is going to do is, it is going to get a camera image from that particular channel. And it might specify a lower bound and an upper bound in terms of time that it is looking for. Process the data, produce an output, and it might place the output. That it produced as a digest of information that it processed into another channel, and it'll do that by putting the digest as a new item, and associate a time stamp. This is a place where the programming model facilitates. Temporal causality to be propagated in the distributed system. So if this thread is a detector thread that is looking at the camera images produced by a camera thread, then it can take the camera images, process it. And place it in its output channel, and when it places it in the output channel, it can use the same time stamp that it got for an input image in doing the put operation, and that way, there is simple causality that is being propagated through this graph by the computation using the primitive that is available from this PTS programming system. Quite often in situation awareness application, the computation may have to use data from several different streams in order to do its high-level inferencing. So a particular thread may get information from several different streams, and it may have to make high level inference based on the composite results that it is getting from all these schemes. And since all the data that is coming from the channels have time stamps associated with them, an intermediate computation like this that is taking multiple streams and generating a high level inference from it, can use the time stamps in the incoming streams to see which data items in these streams are temporally correlated with one another. And use that information to improve the kind of inferencing and hypothesis that can be derived in the situation awareness application. So in a nutshell, the PTS programming model, what it allows, is the ability to associate time stamp with data items produced by a computation. So the model allows propagation of temporal causality because the fact that a computation can associate a time stamp with an item that it produces based on the time stamp that it got. On the data that it obtained from its input channel and thirdly the fact that every stream is temporally indexed allows a computation to correlate the incoming streams. And recognize which data items in the input streams are temporally related to one another by looking at the time stamps associated with the items that it is fetching from the different channels.

Bundling Streams

Another common need in building such situation awareness applications is that a computation may need to get correspondingly timestamped items from several different sensor sources. So, for instance, in this picture I'm showing you a video source, an audio source, a text source, and gesture source. And this computation, in order to do its high level inferencing, it wants to use the multiple modalities of sensing that's available to generate high level inference that can be more robust. And this is where bundling of streams that PTS allows comes in handy. PTS allows streams to be grouped together, as a stream group. And any number of such streams can be grouped together and labelled as a group. And in creating a stream bundle like this, a computation can specify that, for this stream group, this guy is the anchor stream. For instance, we could name the video as the anchor stream for this particular stream group. All the other streams within the same stream group are dependent streams on the anchor stream. And in PTS there's a primitive for getting a group get, say, get corresponding timestamped items from all of the streams in a particular stream group. The group get primitive is once again a way of reducing the pain point for a domain expert. And having to go out and fetch individual items from each one of these streams, and selecting temporally correlated items from all the other dependent streams. That burden is taken away by PTS by providing this group get, where you can get correspondingly timestamped items from all the streams in a given group.

Power of Simplicity

I mentioned that any system design should be simple. And power of simplicity is the key for adoption. So in this case, let's revisit the sequential program for video analytics that I showed you earlier. Converting the sequential program for video analytics into a distributed program, using the get put primitives, provided by PTS, is fairly straightforward. What you do is, you interpose between these computations, that are there in the original pipeline, channels that are named entities that can be used to hold the temporal evolution of output from a particular computation. So in this case, camera capture is a thread, and this capture computation captures images from a camera with a certain periodicity. And places it into a name channel called frames, and so the frames channel abstraction is going to contain the temporal evolution of output produced by this capture computation. And the detector can discover where this frame channel is, connect to it, and get images from this channel, process them and produce blobs that characterize the objects that it sees in any particular given frame. So this is a temporal evolution of the detector's output corresponding to the frames that it sees. And the tracker takes these blobs. And interesting blobs from that are the objects that it is tracking and it produces a temporal evolution of the location of objects that it sees and places it in its output channel and a recognizer may then look at these objects and consults. A database of known objects to see whether any of those objects that are being continuously captured, detected, and tracked, correspond to anything that may indicate that there's an anomalous situation. And those are the events that it produces, and those events may trigger an alarm. So basically, what, what I'm showing you here is converting the sequential program for video analytics into a distributed program using the channel abstraction and the get/put primitive available in the PTS programming model. So the ovals are the threads of the PTS abstraction and these rectangles are the channels that are there between any two computational entities.

PTS Design Principles

PTS provides simple abstractions, namely channel and simple perimeters to manipulate these abstractions, namely get and put. And these channels can be anywhere in the distributed system, just like UNIX sockets, these are named entitities. That are network-wide unique and therefore, you can have a large scale distributed computation in which handles exist everywhere and you can discover them and connect to them and do I/O on them using get and put. All the heavy lifting that is the systems work that needs to happen. In order to support this channel abstraction and the operations that you do want channel abstractions like the get and put operations, are all managed under the cover by the the run time system of pts. So PTS channels can be anywhere, just like unix sockets, can be accessed from anywhere, and they are network wide unqiue, and this is a similarity of the channels to unix sockets, in terms of the ubiquity,in the entire distributive system. But what makes PTS channels particularly attractive for situation awareness applications is that the run time system and the APIs, provided for manipulating the channel Treats time as a first class entity. What we mean by that is time is manipulatable by the application, in the way it specifies items to the runtime system. And it queries the runtime system using time as index into the channel, and that's what is unique about the PTS channels. The second unique property of the PTS extraction is that, it allows streams to be persistent under application control. Remember that these sensors are producing data produced, and all of it cannot obviously be held in the memory of the CPU, so they have to be persisted on more archival storage like a disc, and PTS provides the ability to persist the streams under application control. The flip side of persisting streams is that when a query comes in. For a particular channel, it is actually saying get me items from a lower bound to an upper bound, and the lower bound may be yesterday, which means that we are asking for data that are historical in nature in addition to live data. So the PTS run time system and the semantics of channels, allows seamlessly handling live and historical data. The perimeters are just get and put, whether we are accessing data that is current or historical. You give a lower bone marker and an upper bone marker and the run time system gets to work in doing whatever heavy lifting that needs to be done. To bring the data that you're looking for between the lower bond marker and the upper bond marker. So that's the power of the PTS channel perimeter. It is simple to use, but at the same time it provides handles that will make the life of the domain expert that is developing. A situation awareness application, so much easier because it allows time to be manipulated as a first class entity recognized by the programming system. And it allows persistence for data beyond the lifetime of a computation by moving it to archive storage under application control. And thirdly, it allows for seamlessly integrating live and historical data.

Persistant Channel Architecture

Having given you the abstractions and PTS, and the simplicity of the programming model. I will now introduce you to the heavy lifting that needs to happen under the covers in order to support this simple programming model from the point of view of the domain expert developing a situation awareness application. All the computations in the application can be considered as either producers or consumers of data. Producers of data, are putting things into the system and consumers of data are getting stuff from the system. And under the covers there are worker threads in the run-time system of PTS that react to these get calls coming from the producers and consumers. For instance, whenever, a producer puts an item, that results in new item triggers. That are going to be generated by the worker threads to the rest of the implementation. Implementation of the channel architecture is a three layer architecture. The top layer is the live channel layer of the architecture. And this is the layer that reacts to the new item triggers coming from these worker threads, working on behalf of a put call that is coming from a producer. So the channel abstractions result in these new item triggers to be sent to the live channel layer. Of the channel architecture. And the live channel layer, is the one that is holding a snapshot of items that have been generated on a particular channel. Starting from the oldest item in the channel to the newest item that just came in because of this new item trigger At the time of creation of a channel, the creator of a channel could specify what the semantics of the data that are kept in this channel are. In particular, a creator of a channel could say that, what I want the channel to contain are live data corresponding to a certain snapshot of real time. From oldest to new. For instance, I could say keep only the last 30 seconds of data in the channel. Rest of it, you can throw it away. So there is a garbage collection trigger that is part of the run time system, that is looking at information that is in the channel, that says. What items in the channel have become old and therefore can be thrown away. Those are the Gc triggers. And the Gc triggers will move data that have become ancient, so far as this channel is concerned, and move it into this garbage list. Meaning that these items are no longer relevant from the point of view of this application. And therefore they can be garbage collected. So they're put into this garbage list. And there is another garbage collection thread that is responsible for periodically cleaning up all the garbage that has been created and throwing away stuff that is no longer relevant. For this computation. So this is all the channel book keeping that's happening, under the covers in support of an application that is using the PTS library. But there's a lot more to it than just dealing with live data and data that is no longer relevant that has to be garbage collected or thrown away. As I mentioned one of the features of the PTS architecture is the fact that an application can choose to keep data for as long as they wants, and that is the persistence property supported by PTS run time system. So once again properties of the channel An application programmer could specify that, I don't want to throw away stuff that becomes old to keep in the channel, but I want to archive them, I want to persist them. And if those properties have been associated with the channel, then when items go past the window that has to be stored in the Live Channel Layer. The Live Channel Layer results in generating what are called persistent triggers to indicate that some items have become old in this channel, and they have to be persisted. The second main functional layer in the channel architecture is the Persistence layer. The Interaction layer is just a go between the Live Channel layer and the Persistence layer of the channel architecture. And what the Persistence layer does is, based on the persistence triggers that it gets from the live channel layer. It is going to take items from the channel and decide how to persist them. Now, here again, the application can have a say in how items need to be persisted. And they do that by having a Pickling Handler. That is, the application can specify Here is a function that I want you to use every time you decide to persist some item from the channel. For example, an application may specify that don't store all the images as is on archive or storage, but Condense them in such and so fashion. And that is a function that it can supply. And the runtime system, when it works on persistence, will automatically apply the application-specified function on the items that need to be persisted to create a digest. Which will then be persistent. Items that need to be persistent necessarily have to go to non-volatile storage devices, and here again, the PTS architecture supports several different configured Backends to store items that need to be persistent. And the Backend layer is the third layer in the channel architecture. And PTS supports several different Backends to support the persistence activities and it is an application choice as to which back end layer it wants to use for its specific application. The Backend layers supported by PTS include mySQL, it can use Unix file system as a persistence layer, or it can use a file system from IBM called GPFS. So mySQL, Unix file system, and GPFS are the three different Backends that are available. For the persistence layer to store channel data that needs to be archived for later retrieval. The nice property is that all of the persistence activities happen unbeknownst to the user. All that the user has done is in the creation of a channel. Specified certain properties to associate with that channel. For example, the property that may have been associated with a channel is that any items beyond the last 30 seconds persistent on the storage, and when you persist them on the storage, apply this function. Those are the things being specified by the user creation time of the channel, once that is done the heavy lifting that needs to be done during down time, is all handled under the covers, by the run time system. Of the channel architecture that takes items from the channel, pickles them using the function that has been specified, and uses one of the configured Backends to push those pickled items onto the persistent storage. On the other side, when. An application wants to get an item. That item range may span from something that is there in the live channel part of the three layer architecture, or it could be on the archival storage. Now it is up to the run time system to retrieve all the items between the lower bond. And the upper bound specified by get primitive. So get primitive that spans both live and archived items results in get triggers being passed from the light channel through the interaction layer to the Backend so that corresponding to the get interval that is specified. The Backend layer can pull the data from the archive storage and pass it up so that it finally gets to the application. What I wanted to illustrate through this picture is there is a lot of heavy lifting that needs to happen in order to support a simple programming model. The programming model is very simple But, in order to support the simplicity, all of the heavy lifting has to be absorbed under the covers in the run time system of the PTS programing model.


Just as map reduce provides a simple and intuitive programming model for the domain expert to develop big data applications, PTS provides a simple programming model for the domain expert to develop live stream analysis applications. Time-based distributed data structures for streams, automatic data management, transparent stream persistence. These are then unique features of the PTS programming model that facilitates live stream analysis. I invite you to read the paper in its entirety to understand the many systems challenges that are identified and solved by PTS in providing this programming model. This completes our discussion in this course on the topic of real-time and multimedia.