cc-mrjob: Not working anymore on EMR? "subprocess failed with code 1"

Hi, I have some problems letting the MR jobs run on EMR. The scripts work locally and they work with 1 or 10 warc files on EMR but with 100 I always get some failures of the type “PipeMapRed.waitOutputThreads(): subprocess failed with code 1”. I start the script with the following command:

python email_counter_emr.py -r emr --jobconf mapreduce.task.timeout=3600000 --conf-path mrjob.conf --no-output --output-dir=s3://tw-mapreduce/emailscan-8/ input/test-100.warc

NOTE: I use the 2017-10-16 commoncrawl files in test-100.warc - e.g. crawl-data/CC-MAIN-2017-43/segments/1508187820466.2/warc/CC-MAIN-20171016214209-20171016234209-00000.warc.gz

My mrjob.conf file:

runners:
  emr:
    region: us-west-2
    zone: us-west-2c

    instance_type: c3.8xlarge
    core_instance_bid_price: '1.0'
    num_core_instances: 10

    # We also install packages needed for streaming compressed files from S3 or reading WARC files
    image_version: 3.0.4   # There's a newer AMI version but it has issues with the released stable mrjob
    interpreter: python2.7 # EMR comes with Python 2.6 by default -- installing Python 2.7 takes a while but might be necessary
    bootstrap:
    - sudo yum --releasever=2014.09 install -y python27 python27-devel gcc-c++
    - sudo python2.7 get-pip.py#
    - sudo pip2.7 install boto mrjob simplejson warc
    - sudo pip2.7 install https://github.com/commoncrawl/gzipstream/archive/master.zip

My MR-script email_counter_emr.py to identify emails (based on tag_counter.py but shortened):

#!/usr/bin/env python

import gzip
import logging
import os.path as Path
import warc
import boto
from boto.s3.key import Key
from gzipstream import GzipStreamFile
from mrjob.job import MRJob
from mrjob.util import log_to_stream

import re
from collections import Counter

EMAIL_PATTERN = re.compile(r"\b[a-z0-9\p{L}\.\_\%\+\-]+@[a-z0-9\p{L}\.\-]+\.[a-z]{2,}\b")
FILE_PATTERN  = re.compile(r".*\.(jpe?g|a?png|gif|tiff?|bmp|ico|dxf|webp|svg|eps|pdf|html?|css|js|py|downloads?|hidden|stream|chnla|aspx|invalid|jira|comand|tld|funky|nix)$")

class EmailCounter(MRJob):
    def configure_args(self):
        super(EmailCounter, self).configure_args()
        self.pass_arg_through('--runner')
        self.pass_arg_through('-r')

    def process_record(self, record):
        # We're only interested in the HTTP responses
        if record['Content-Type'] != 'application/http; msgtype=response':
            return
        payload = record.payload.read()

        # The HTTP response is defined by a specification: first part is headers (metadata)
        # and then following two CRLFs (newlines) has the data for the response
        headers, body = payload.split('\r\n\r\n', 1)
        if not 'Content-Type: text/html' in headers:
            return
        for email in EMAIL_PATTERN.findall(body.lower()):
            # filter out files and dirty emails
            if not (FILE_PATTERN.match(email) or (".." in email)):
                yield (email, 1)
            else:
                yield "x@x.x", 0

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))

    def mapper(self, _, line):
        # If we're on EC2 or running on a Hadoop cluster, pull files via S3
        if self.options.runner in ['emr', 'hadoop']:
            # Connect to Amazon S3 using anonymous credentials
            conn = boto.connect_s3(anon=True)
            pds = conn.get_bucket('commoncrawl')
            k = Key(pds, line)
            ccfile = warc.WARCFile(fileobj=GzipStreamFile(k))
        # If we're local, use files on the local file system
        else:
            line = Path.join(Path.abspath("/Users/jorgrech/repos/cc-mrjob"), line)
            ccfile = warc.WARCFile(fileobj=gzip.open(line))

        for _i, record in enumerate(ccfile):
            for key, value in self.process_record(record):
                yield (key, value)

if __name__ == '__main__':
    EmailCounter.run()

The logs have the following info: syslog:

...
2017-11-05 19:40:37,023 INFO org.apache.hadoop.mapreduce.Job (main):  map 98% reduce 31%
2017-11-05 19:40:46,080 INFO org.apache.hadoop.mapreduce.Job (main):  map 99% reduce 31%
2017-11-05 19:45:18,145 INFO org.apache.hadoop.mapreduce.Job (main): Task Id : attempt_1509908316427_0001_m_000065_1, Status : FAILED
2017-11-05 19:45:19,149 INFO org.apache.hadoop.mapreduce.Job (main):  map 98% reduce 31%
2017-11-05 19:45:29,202 INFO org.apache.hadoop.mapreduce.Job (main):  map 99% reduce 31%
2017-11-05 19:49:52,766 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 100%
2017-11-05 19:49:52,775 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1509908316427_0001 failed with state FAILED due to: Task failed task_1509908316427_0001_m_000019
Job failed as tasks failed. failedMaps:1 failedReduces:0
...

stderr:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
	at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
	at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
	at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
	at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:433)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

As you mentioned “run over batches of files” in the readme I was hoping that you have some leads what could be wrong or how I can debug this problem. Running over ~8900 files with 10 warc files (for the 2017-10-16 commoncrawl) would be a little bit excessive and more expensive.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 17

Most upvoted comments

The mrjob.conf is updated to use the us-east-1 region and explicitly define cloud_tmp_dir to avoid the An error occurred (MalformedXML) when calling the CreateBucket operation error.

I’ve verified the update configuration by running

python tag_counter_emr.py -r emr --conf-path mrjob.conf --no-output --output-dir s3://my-output-bucket/cc-mrjob-test/ input/test-100.warc

as described in the README. Output dir and cloud_tmp_dir have been set appropriately: bucket with write permissions, output dir does not exist. Thanks!