Word count in HDFS
==================
Setup
-----
In this example, we'll use ``distributed`` with the ``hdfs3`` library to count
the number of words in text files (Enron email dataset, 6.4 GB) stored in HDFS.
Copy the text data from Amazon S3 into HDFS on the cluster:
.. code-block:: bash
$ hadoop distcp s3n://AWS_SECRET_ID:AWS_SECRET_KEY@blaze-data/enron-email hdfs:///tmp/enron
where ``AWS_SECRET_ID`` and ``AWS_SECRET_KEY`` are valid AWS credentials.
Start the ``distributed`` scheduler and workers on the cluster.
Code example
------------
Import ``distributed``, ``hdfs3``, and other standard libraries used in this example:
.. code-block:: python
>>> import hdfs3
>>> from collections import defaultdict, Counter
>>> from distributed import Client, progress
Initialize a connection to HDFS, replacing ``NAMENODE_HOSTNAME`` and
``NAMENODE_PORT`` with the hostname and port (default: 8020) of the HDFS
namenode.
.. code-block:: python
>>> hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
Initialize a connection to the ``distributed`` client, replacing
``SCHEDULER_IP`` and ``SCHEDULER_PORT`` with the IP address and port of the
``distributed`` scheduler.
.. code-block:: python
>>> client = Client('SCHEDULER_IP:SCHEDULER_PORT')
Generate a list of filenames from the text data in HDFS:
.. code-block:: python
>>> filenames = hdfs.glob('/tmp/enron/*/*')
>>> print(filenames[:5])
['/tmp/enron/edrm-enron-v2_nemec-g_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_ring-r_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_bailey-s_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_fischer-m_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_geaccone-t_xml.zip/merged.txt']
Print the first 1024 bytes of the first text file:
.. code-block:: python
>>> print(hdfs.head(filenames[0]))
b'Date: Wed, 29 Nov 2000 09:33:00 -0800 (PST)\r\nFrom: Xochitl-Alexis Velasc
o\r\nTo: Mark Knippa, Mike D Smith, Gerald Nemec, Dave S Laipple, Bo Barnwel
l\r\nCc: Melissa Jones, Iris Waser, Pat Radford, Bonnie Shumaker\r\nSubject:
Finalize ECS/EES Master Agreement\r\nX-SDOC: 161476\r\nX-ZLID: zl-edrm-enro
n-v2-nemec-g-2802.eml\r\n\r\nPlease plan to attend a meeting to finalize the
ECS/EES Master Agreement \r\ntomorrow 11/30/00 at 1:30 pm CST.\r\n\r\nI wi
ll email everyone tomorrow with location.\r\n\r\nDave-I will also email you
the call in number tomorrow.\r\n\r\nThanks\r\nXochitl\r\n\r\n***********\r\n
EDRM Enron Email Data Set has been produced in EML, PST and NSF format by ZL
Technologies, Inc. This Data Set is licensed under a Creative Commons Attri
bution 3.0 United States License . To provide attribution, please cite to "ZL Technologies, Inc. (http:
//www.zlti.com)."\r\n***********\r\nDate: Wed, 29 Nov 2000 09:40:00 -0800 (P
ST)\r\nFrom: Jill T Zivley\r\nTo: Robert Cook, Robert Crockett, John Handley
, Shawna'
Create a function to count words in each file:
.. code-block:: python
>>> def count_words(fn):
... word_counts = defaultdict(int)
... with hdfs.open(fn) as f:
... for line in f:
... for word in line.split():
... word_counts[word] += 1
... return word_counts
Before we process all of the text files using the distributed workers, let's
test our function locally by counting the number of words in the first text
file:
.. code-block:: python
>>> counts = count_words(filenames[0])
>>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'the', 144873),
(b'of', 98122),
(b'to', 97202),
(b'and', 90575),
(b'or', 60305),
(b'in', 53869),
(b'a', 43300),
(b'any', 31632),
(b'by', 31515),
(b'is', 30055)]
We can perform the same operation of counting the words in the first text file,
except we will use ``client.submit`` to execute the computation on a ``distributed``
worker:
.. code-block:: python
>>> future = client.submit(count_words, filenames[0])
>>> counts = future.result()
>>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'the', 144873),
(b'of', 98122),
(b'to', 97202),
(b'and', 90575),
(b'or', 60305),
(b'in', 53869),
(b'a', 43300),
(b'any', 31632),
(b'by', 31515),
(b'is', 30055)]
We are ready to count the number of words in all of the text files using
``distributed`` workers. Note that the ``map`` operation is non-blocking, and
you can continue to work in the Python shell/notebook while the computations
are running.
.. code-block:: python
>>> futures = client.map(count_words, filenames)
We can check the status of some ``futures`` while all of the text files are
being processed:
.. code-block:: python
>>> len(futures)
161
>>> futures[:5]
[,
,
,
,
]
>>> progress(futures)
[########################################] | 100% Completed | 3min 0.2s
When the ``futures`` finish reading in all of the text files and counting
words, the results will exist on each worker. This operation required about
3 minutes to run on a cluster with three worker machines, each with 4 cores
and 16 GB RAM.
Note that because the previous computation is bound by the GIL in Python, we
can speed it up by starting the ``distributed`` workers with the
``--nworkers 4`` option.
To sum the word counts for all of the text files, we need to gather some
information from the ``distributed`` workers. To reduce the amount of data
that we gather from the workers, we can define a function that only returns the
top 10,000 words from each text file.
.. code-block:: python
>>> def top_items(d):
... items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
... return dict(items)
We can then ``map`` the futures from the previous step to this culling
function. This is a convenient way to construct a pipeline of computations
using futures:
.. code-block:: python
>>> futures2 = client.map(top_items, futures)
We can ``gather`` the resulting culled word count data for each text file to
the local process:
.. code-block:: python
>>> results = client.gather(iter(futures2))
To sum the word counts for all of the text files, we can iterate over the
results in ``futures2`` and update a local dictionary that contains all of the
word counts.
.. code-block:: python
>>> all_counts = Counter()
>>> for result in results:
... all_counts.update(result)
Finally, we print the total number of words in the results and the words with
the highest frequency from all of the text files:
.. code-block:: python
>>> print(len(all_counts))
8797842
>>> print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'0', 67218380),
(b'the', 19586868),
(b'-', 14123768),
(b'to', 11893464),
(b'N/A', 11814665),
(b'of', 11724827),
(b'and', 10253753),
(b'in', 6684937),
(b'a', 5470371),
(b'or', 5227805)]
The complete Python script for this example is shown below:
.. code-block:: python
# word-count.py
import hdfs3
from collections import defaultdict, Counter
from distributed import Client
from distributed.diagnostics.progressbar import progress
hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
client = Client('SCHEDULER_IP:SCHEDULER:PORT')
filenames = hdfs.glob('/tmp/enron/*/*')
print(filenames[:5])
print(hdfs.head(filenames[0]))
def count_words(fn):
word_counts = defaultdict(int)
with hdfs.open(fn) as f:
for line in f:
for word in line.split():
word_counts[word] += 1
return word_counts
counts = count_words(filenames[0])
print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
future = client.submit(count_words, filenames[0])
counts = future.result()
print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
futures = client.map(count_words, filenames)
len(futures)
futures[:5]
progress(futures)
def top_items(d):
items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
return dict(items)
futures2 = client.map(top_items, futures)
results = client.gather(iter(futures2))
all_counts = Counter()
for result in results:
all_counts.update(result)
print(len(all_counts))
print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])