forked from GoogleCloudPlatform/DataflowSDK-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmergecontacts.py
134 lines (115 loc) · 5.63 KB
/
mergecontacts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Merge phone, email, and mailing address information.
A Dataflow pipeline that merges phone, email, and address information associated
with the same names. Each input "database" is a tab-delimited text file pairing
names with one phone number/email address/mailing address; multiple entries
associated with the same name are allowed. Outputs are a tab-delimited text file
with the merged information and another file containing some simple statistics.
See mergecontacts_test.py for example inputs and outputs.
A demonstration of:
CoGroupByKey
Non-linear pipelines (i.e., pipelines with branches)
"""
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
def run(argv=None, assert_results=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_email',
required=True,
help='Email database, with each line formatted as "name<TAB>email".')
parser.add_argument(
'--input_phone',
required=True,
help='Phonebook, with each line formatted as "name<TAB>phone number".')
parser.add_argument(
'--input_snailmail',
required=True,
help='Address database, with each line formatted as "name<TAB>address".')
parser.add_argument('--output_tsv',
required=True,
help='Tab-delimited output file.')
parser.add_argument('--output_stats',
required=True,
help='Output file for statistics about the input.')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# Helper: read a tab-separated key-value mapping from a text file,
# escape all quotes/backslashes, and convert it a PCollection of
# (key, value) pairs.
def read_kv_textfile(label, textfile):
return (p
| 'Read: %s' % label >> ReadFromText(textfile)
| 'Backslash: %s' % label >> beam.Map(
lambda x: re.sub(r'\\', r'\\\\', x))
| 'EscapeQuotes: %s' % label >> beam.Map(
lambda x: re.sub(r'"', r'\"', x))
| 'Split: %s' % label >> beam.Map(
lambda x: re.split(r'\t+', x, 1)))
# Read input databases.
email = read_kv_textfile('email', known_args.input_email)
phone = read_kv_textfile('phone', known_args.input_phone)
snailmail = read_kv_textfile('snailmail', known_args.input_snailmail)
# Group together all entries under the same name.
grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
# Prepare tab-delimited output; something like this:
# "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
tsv_lines = grouped | beam.Map(
lambda (name, (email, phone, snailmail)): '\t'.join(
['"%s"' % name,
'"%s"' % ','.join(email),
'"%s"' % ','.join(phone),
'"%s"' % next(iter(snailmail), '')]))
# Compute some stats about our database of people.
luddites = grouped | beam.Filter( # People without email.
lambda (name, (email, phone, snailmail)): not next(iter(email), None))
writers = grouped | beam.Filter( # People without phones.
lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
nomads = grouped | beam.Filter( # People without addresses.
lambda (name, (e, p, snailmail)): not next(iter(snailmail), None))
num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally()
# Write tab-delimited output.
# pylint: disable=expression-not-assigned
tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv)
# TODO(silviuc): Move the assert_results logic to the unit test.
if assert_results is not None:
expected_luddites, expected_writers, expected_nomads = assert_results
assert_that(num_luddites, equal_to([expected_luddites]),
label='assert:luddites')
assert_that(num_writers, equal_to([expected_writers]),
label='assert:writers')
assert_that(num_nomads, equal_to([expected_nomads]),
label='assert:nomads')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()