From Wikipedia - "Streaming algorithms are algorithms for processing data streams in which the input is presented as a sequence of items and can be examined in only a few passes (typically just one). These algorithms have limited memory available to them (much less than the input size) and also limited processing time per item."
More formally, a sequence S = <a1, a2, . . . , am>, where the elements of the sequence (called tokens) are drawn from the universe [n] := {1, 2, . . . , n}. Note the two important size parameters: the stream length, m, and the universe size, n. Since m and n are to be thought of as “huge,” we want to make s much smaller than these; specifically, we want s to be sublinear in both m and n. The holy grail is to achieve:
More formally, a sequence S = <a1, a2, . . . , am>, where the elements of the sequence (called tokens) are drawn from the universe [n] := {1, 2, . . . , n}. Note the two important size parameters: the stream length, m, and the universe size, n. Since m and n are to be thought of as “huge,” we want to make s much smaller than these; specifically, we want s to be sublinear in both m and n. The holy grail is to achieve:
s = O(log m + log n)
1. Finding Frequent Items
We have a stream S = <a1, . . . , an>, with each ai belongs to [n], and this implicitly defines a frequency vector f = ( f1, . . . , fn). Note that f1 + · · · + fn = m. In the majority of problem, the task is as follows:
if for each j : f j > m/2, then output j, otherwise, output “No”. This can be generalized to the FREQUENT problem, with parameter k, as follows: output the set { j : f j > m/k}.
Clearspring has open sourced a library "Stream-lib" that is ideal for summarizing streams and counting distinct elements or cardinality estimation. Here is a sample code to find top 100 elements in a stream.
public static void main(String[] args) {
long count = 0;
StreamSummary<String> topk = new StreamSummary<String>(1000);
/* Read product(s) id from console or a stream e.g.
* 300645, 301482, 286467, 282697, 282697, 301482, 286467, .....
*/
List<String> productIds = readProductId();
for (String productId : productIds) {
topk.offer(productId);
count++;
}
count = 0;
List<Counter<String>counters = topk.topK(100);
for (Counter counter : counters) {
System.out.println((String.format("%d %s %d %d", ++count,
counter.getItem(), counter.getCount(),
counter.getError())));
}
}
Sample Output:
Count ProductId Frequency Error
1 300645 231 0
2 282697 221 0
3 301482 105 0
4 295059 59 0
5 286467 58 0
....
Finding frequent items or Top K elements are related problems. The above code uses Space-Saving algorithm which is a deterministic algorithm i.e. it guarantees the correctness of frequent elements as well as correctness and the order of Top K elements.
The algorithm basically works like this: The stream is processed one item at a time. A collection of k distinct items and their associated counters is maintained. If a new item is encountered and fewer than k items are in the collection, then the item is added and its counter is set to 1. If the item is already in the collection, its counter is increased by 1. If the item is not in the collection and the collection already has a size of k, then the item with lowest counter is removed and the new item is added, with its counter set to one larger than the previous minimum counter.
Here is some pseudo code to make this clearer:
SpaceSaving(k, stream):
collection = empty collection
for each element in stream:
if element in collection:
then collection[element] += 1
else if length of collection < k:
then add element to collection, collection[element] = 1
else:
current_minimum_element = element with lowest count value in collection
current_minimum = collection[current_minimum_element]
remove current_minimum_element from collection
collection[element] = current_minimum + 1
Finding frequent elements has applications to network traffic monitoring, anomaly detection and DDoS detection. In case of DDoS detection, the top few frequent IPs are continuously maintained for further action. Possible IP addresses can be large, but mostly a subset of IPs are seen in an attack. In such cases, the Space Saving algorithm space bound is a function of the number of distinct IPs that have occurred in the stream.
In part 2, we will look at counting distinct elements and other stream algorithms.