Skip to content

Commit 3b4c48a

Browse files
author
Hurshal Patel
committed
Merge pull request #6 from memsql/pipe_through_script
support piping load through a script
2 parents a557e49 + 616775e commit 3b4c48a

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

memsql_loader/execution/downloader.py

+45-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pycurl
2+
import subprocess
23
import select
34
import sys
45
import threading
@@ -16,6 +17,7 @@
1617
import pywebhdfs.errors
1718

1819
DOWNLOAD_TIMEOUT = 30
20+
SCRIPT_EXIT_TIMEOUT = 30
1921

2022
class DownloadMetrics(object):
2123
def __init__(self, total_size):
@@ -97,6 +99,7 @@ def load(self, job, task, fifo):
9799
self.task = task
98100
self.fifo = fifo
99101
self.key = None
102+
self.script_proc = None
100103

101104
if task.data['scheme'] == 's3':
102105
self.is_anonymous = job.spec.source.aws_access_key is None or job.spec.source.aws_secret_key is None
@@ -173,7 +176,25 @@ def run(self):
173176
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
174177
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
175178
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
176-
curl.setopt(pycurl.WRITEFUNCTION, self._write_to_fifo(target_file))
179+
180+
if self.job.spec.options.script is not None:
181+
self.script_proc = subprocess.Popen(
182+
["/bin/bash", "-c", self.job.spec.options.script],
183+
stdout=target_file.fileno(),
184+
stdin=subprocess.PIPE)
185+
186+
# check that script hasn't errored before downloading
187+
# NOTE: we wait here so that we can check if a script exits prematurely
188+
# if this is the case, we fail the job without requeueing
189+
time.sleep(1)
190+
if self.script_proc.poll() is not None:
191+
self.logger.error('Script `%s` exited prematurely with return code %d' % (self.job.spec.options.script, self.script_proc.returncode))
192+
raise WorkerException('Script `%s` exited prematurely with return code %d' % (self.job.spec.options.script, self.script_proc.returncode))
193+
194+
curl.setopt(pycurl.WRITEFUNCTION, self._write_to_fifo(self.script_proc.stdin))
195+
else:
196+
curl.setopt(pycurl.WRITEFUNCTION, self._write_to_fifo(target_file))
197+
177198
if self.task.data['scheme'] == 'hdfs':
178199
curl.setopt(pycurl.FOLLOWLOCATION, True)
179200

@@ -187,9 +208,32 @@ def run(self):
187208
# Catch HTTP client errors, e.g. 404:
188209
if status_code >= 400 and status_code < 500:
189210
raise WorkerException('HTTP status code %s for file %s' % (status_code, self.key.name))
211+
212+
# If we're piping data through a script, catch timeouts and return codes
213+
if self.script_proc is not None:
214+
self.script_proc.stdin.close()
215+
for i in range(SCRIPT_EXIT_TIMEOUT):
216+
if self.script_proc.poll() is not None:
217+
break
218+
219+
time.sleep(1)
220+
else:
221+
self.logger.error('Script `%s` failed to exit...killing' % self.job.spec.options.script)
222+
self.script_proc.kill()
223+
raise WorkerException('Script `%s` failed to exit after %d seconds' % (self.job.spec.options.script, SCRIPT_EXIT_TIMEOUT))
224+
225+
if self.script_proc.returncode != 0:
226+
self.logger.error('Script `%s` exited with return code %d' % (self.job.spec.options.script, self.script_proc.returncode))
227+
raise WorkerException('Script `%s` exited with return code %d' % (self.job.spec.options.script, self.script_proc.returncode))
190228
finally:
191229
with self.task.protect():
192230
self.task.stop_step('download')
231+
232+
if self.script_proc is not None and self.script_proc.returncode is not None:
233+
try:
234+
self.script_proc.kill()
235+
except OSError as e:
236+
self.logger.warn("Failed to kill script `%s`: %s" % (self.job.spec.options.script, str(e)))
193237
except pycurl.error as e:
194238
errno = e.args[0]
195239
if errno == pycurl.E_ABORTED_BY_CALLBACK and not self._should_exit:

memsql_loader/util/schema.py

+7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import shlex
23
import urlparse
34
import voluptuous as V
45

@@ -65,6 +66,7 @@ def get_spec_validator():
6566
V.Required("file_id_column", default=None): V.Any(basestring, None),
6667
V.Required("non_local_load", default=False): bool,
6768
V.Required("duplicate_key_method", default="error"): V.Any("error", "replace", "ignore"),
69+
V.Required("script", default=None): V.Any(basestring, None)
6870
})
6971

7072
_db_schema = V.Schema({
@@ -187,4 +189,9 @@ def validate_spec(spec):
187189
if file_id_column in spec['options']['columns']:
188190
raise V.Invalid('options.columns can not contain the file_id_column, it will be filled in by MemSQL-Loader',
189191
path=[ 'options', 'columns' ])
192+
if spec.options.script is not None:
193+
try:
194+
shlex.split(spec.options.script)
195+
except ValueError as e:
196+
raise V.Invalid('options.script is invalid: %s' % str(e), path=[ 'options', 'script' ])
190197
return spec

0 commit comments

Comments
 (0)