Skip to content

Commit

Permalink
[BEAM-8399] Add --hdfs_full_urls option (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
udim committed Nov 26, 2019
1 parent b446304 commit 342f239
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 37 deletions.
79 changes: 52 additions & 27 deletions sdks/python/apache_beam/io/hadoopfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

_HDFS_PREFIX = 'hdfs:/'
_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)')
_FULL_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'/([^/]*)(/.*)')
_COPY_BUFFER_SIZE = 2 ** 16
_DEFAULT_BUFFER_SIZE = 20 * 1024 * 1024

Expand Down Expand Up @@ -115,42 +116,59 @@ def __init__(self, pipeline_options):
hdfs_host = hdfs_options.hdfs_host
hdfs_port = hdfs_options.hdfs_port
hdfs_user = hdfs_options.hdfs_user
self.full_urls = hdfs_options.hdfs_full_urls
else:
hdfs_host = pipeline_options.get('hdfs_host')
hdfs_port = pipeline_options.get('hdfs_port')
hdfs_user = pipeline_options.get('hdfs_user')
self.full_urls = pipeline_options.get('hdfs_full_urls', False)

if hdfs_host is None:
raise ValueError('hdfs_host is not set')
if hdfs_port is None:
raise ValueError('hdfs_port is not set')
if hdfs_user is None:
raise ValueError('hdfs_user is not set')
if not isinstance(self.full_urls, bool):
raise ValueError('hdfs_full_urls should be bool, got: %s', self.full_urls)
self._hdfs_client = hdfs.InsecureClient(
'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)

@classmethod
def scheme(cls):
return 'hdfs'

@staticmethod
def _parse_url(url):
def _parse_url(self, url, full_urls=None):
"""Verifies that url begins with hdfs:// prefix, strips it and adds a
leading /.
Raises:
ValueError if url doesn't begin with hdfs://.
Parsing behavior is determined by HadoopFileSystemOptions.hdfs_url_style.
Args:
url: A URL in the form hdfs://path/...
url: (str) A URL in the form hdfs://path/...
or in the form hdfs://server/path/...
full_urls: (bool) For testing purposes only. Overrides full_urls setting.
Raises:
ValueError if the URL doesn't match the expect format.
Returns:
For an input of 'hdfs://path/...', will return '/path/...'.
(str, str) If using full_urls For an input of 'hdfs://[server/]path/...', will return
(server, '/path/...')
"""
m = _URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return m.group(1)
if full_urls is None:
full_urls = self.full_url

if not full_urls:
m = _URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return None, m.group(1)
else:
m = _FULL_URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return m.groups()

def join(self, base_url, *paths):
"""Join two or more pathname components.
Expand All @@ -163,19 +181,25 @@ def join(self, base_url, *paths):
Returns:
Full url after combining all the passed components.
"""
basepath = self._parse_url(base_url)
return _HDFS_PREFIX + self._join(basepath, *paths)
server, basepath = self._parse_url(base_url)
# TODO full_urls check and test
return _HDFS_PREFIX + self._join(server, basepath, *paths)

def _join(self, basepath, *paths):
return posixpath.join(basepath, *paths)

def split(self, url):
rel_path = self._parse_url(url)
server, rel_path = self._parse_url(url)
if server is None:
server = ''
else:
server = '/' + server
head, tail = posixpath.split(rel_path)
return _HDFS_PREFIX + head, tail
# TODO full_urls check and test
return _HDFS_PREFIX + server + head, tail

def mkdirs(self, url):
path = self._parse_url(url)
_, path = self._parse_url(url)
if self._exists(path):
raise BeamIOError('Path already exists: %s' % path)
return self._mkdirs(path)
Expand All @@ -188,9 +212,10 @@ def has_dirs(self):

def _list(self, url):
try:
path = self._parse_url(url)
server, path = self._parse_url(url)
for res in self._hdfs_client.list(path, status=True):
yield FileMetadata(_HDFS_PREFIX + self._join(path, res[0]),
# TODO full_urls check and test
yield FileMetadata(_HDFS_PREFIX + self._join(server, path, res[0]),
res[1][_FILE_STATUS_LENGTH])
except Exception as e: # pylint: disable=broad-except
raise BeamIOError('List operation failed', {url: e})
Expand All @@ -213,7 +238,7 @@ def create(self, url, mime_type='application/octet-stream',
Returns:
A Python File-like object.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._create(path, mime_type, compression_type)

def _create(self, path, mime_type='application/octet-stream',
Expand All @@ -230,7 +255,7 @@ def open(self, url, mime_type='application/octet-stream',
Returns:
A Python File-like object.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._open(path, mime_type, compression_type)

def _open(self, path, mime_type='application/octet-stream',
Expand Down Expand Up @@ -289,8 +314,8 @@ def _copy_path(source, destination):
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
_copy_path(rel_source, rel_destination)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
Expand All @@ -302,8 +327,8 @@ def rename(self, source_file_names, destination_file_names):
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
try:
self._hdfs_client.rename(rel_source, rel_destination)
except hdfs.HdfsError as e:
Expand All @@ -324,7 +349,7 @@ def exists(self, url):
Returns:
True if url exists as a file or directory in HDFS.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._exists(path)

def _exists(self, path):
Expand All @@ -336,7 +361,7 @@ def _exists(self, path):
return self._hdfs_client.status(path, strict=False) is not None

def size(self, url):
path = self._parse_url(url)
_, path = self._parse_url(url)
status = self._hdfs_client.status(path, strict=False)
if status is None:
raise BeamIOError('File not found: %s' % url)
Expand All @@ -351,7 +376,7 @@ def checksum(self, url):
Returns:
String describing the checksum.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
file_checksum = self._hdfs_client.checksum(path)
return '%s-%d-%s' % (
file_checksum[_FILE_CHECKSUM_ALGORITHM],
Expand All @@ -363,7 +388,7 @@ def delete(self, urls):
exceptions = {}
for url in urls:
try:
path = self._parse_url(url)
_, path = self._parse_url(url)
self._hdfs_client.delete(path, recursive=True)
except Exception as e: # pylint: disable=broad-except
exceptions[url] = e
Expand Down
84 changes: 74 additions & 10 deletions sdks/python/apache_beam/io/hadoopfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,18 @@ def setUpClass(cls):
cls.assertCountEqual = cls.assertItemsEqual

def setUp(self):
# TODO: isn't this racey?
self._fake_hdfs = FakeHdfs()
hdfs.hdfs.InsecureClient = (
lambda *args, **kwargs: self._fake_hdfs)
pipeline_options = PipelineOptions()
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
hdfs_options.hdfs_host = ''
hdfs_options.hdfs_port = 0
hdfs_options.hdfs_host = 'host'
hdfs_options.hdfs_port = 123
hdfs_options.hdfs_user = ''

self.fs = hdfs.HadoopFileSystem(pipeline_options)
# TODO full_urls check and test
self.tmpdir = 'hdfs://test_dir'

for filename in ['old_file1', 'old_file2']:
Expand All @@ -231,6 +233,55 @@ def test_scheme(self):
self.assertEqual(self.fs.scheme(), 'hdfs')
self.assertEqual(hdfs.HadoopFileSystem.scheme(), 'hdfs')

def test_parse_url(self):
cases = [
('hdfs://', '/', auto),
('hdfs://one_element', '/one_element', auto),
('hdfs://path/to/file', '/path/to/file', auto),
('hdfs://host:123/path/to/file', '/path/to/file', auto),
('hdfs://host:123/', '/', auto),
('hdfs://host:123', '/host:123', auto),

# Using 'any_host' instead of host:123 since host:port values are not
# supposed to match those in HadoopFileSystemOptions.
('hdfs://path/to/file', '/to/file', host),
('hdfs://any_host/path/to/file', '/path/to/file', host),
('hdfs://any_host/', '/', host),

('hdfs://', '/', no_host),
('hdfs://path/to/file', '/path/to/file', no_host),
('hdfs://any_host/path/to/file', '/any_host/path/to/file', no_host),
('hdfs://any_host/', '/any_host/', no_host),
('hdfs://any_host', '/any_host', no_host),
]
for url, expected, url_style in cases:
self.assertEqual(expected, self.fs._parse_url(url, full_urls=url_style),
msg=(url, expected, url_style))

def test_parse_url_fail(self):
auto = HadoopFileSystemOptions.URL_STYLE_AUTO
host = HadoopFileSystemOptions.URL_STYLE_HOST
no_host = HadoopFileSystemOptions.URL_STYLE_NO_HOST
cases = [
('hdfs:/missing_slash', auto),
('invalid', auto),
('hdfs://bad_host:123/', auto),
('hdfs://host:1337/', auto),

('hdfs:/missing_slash', host),
('invalid', host),
('hdfs://bad_host:123/', host),
('hdfs://host:1337/', host),
('hdfs://', host),
('hdfs://host_no_slash_at_end', host),

('hdfs:/missing_slash', no_host),
('invalid', no_host),
]
for url, url_style in cases:
with self.assertRaises(ValueError, msg=(url, url_style)):
self.fs._parse_url(url, full_urls=url_style)

def test_url_join(self):
self.assertEqual('hdfs://tmp/path/to/file',
self.fs.join('hdfs://tmp/path', 'to', 'file'))
Expand Down Expand Up @@ -298,6 +349,7 @@ def test_match_file_error(self):
r'^Match operation failed .* %s' % bad_url):
result = self.fs.match([bad_url, url])[0]
files = [f.path for f in result.metadata_list]
# TODO full_urls check and test
self.assertEqual(files, [self.fs._parse_url(url)])

def test_match_directory(self):
Expand All @@ -323,7 +375,7 @@ def test_create_success(self):
url = self.fs.join(self.tmpdir, 'new_file')
handle = self.fs.create(url)
self.assertIsNotNone(handle)
url = self.fs._parse_url(url)
_, url = self.fs._parse_url(url)
expected_file = FakeFile(url, 'wb')
self.assertEqual(self._fake_hdfs.files[url], expected_file)

Expand All @@ -332,7 +384,7 @@ def test_create_write_read_compressed(self):

handle = self.fs.create(url)
self.assertIsNotNone(handle)
path = self.fs._parse_url(url)
_, path = self.fs._parse_url(url)
expected_file = FakeFile(path, 'wb')
self.assertEqual(self._fake_hdfs.files[path], expected_file)
data = b'abc' * 10
Expand Down Expand Up @@ -527,7 +579,7 @@ def test_delete_error(self):
url2 = self.fs.join(self.tmpdir, 'old_file1')

self.assertTrue(self.fs.exists(url2))
path1 = self.fs._parse_url(url1)
_, path1 = self.fs._parse_url(url1)
with self.assertRaisesRegex(BeamIOError,
r'^Delete operation failed .* %s' % path1):
self.fs.delete([url1, url2])
Expand All @@ -538,23 +590,22 @@ class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase):
"""Tests pipeline_options, in the form of a
RuntimeValueProvider.runtime_options object."""

def test_dict_options(self):
def setUp(self):
self._fake_hdfs = FakeHdfs()
hdfs.hdfs.InsecureClient = (
lambda *args, **kwargs: self._fake_hdfs)

def test_dict_options(self):
pipeline_options = {
'hdfs_host': '',
'hdfs_port': 0,
'hdfs_user': '',
}

self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertFalse(self.fs.full_url)

def test_dict_options_missing(self):
self._fake_hdfs = FakeHdfs()
hdfs.hdfs.InsecureClient = (
lambda *args, **kwargs: self._fake_hdfs)

with self.assertRaisesRegex(ValueError, r'hdfs_host'):
self.fs = hdfs.HadoopFileSystem(
pipeline_options={
Expand All @@ -579,6 +630,19 @@ def test_dict_options_missing(self):
}
)

def test_dict_options_full_urls(self):
pipeline_options = {
'hdfs_host': '',
'hdfs_port': 0,
'hdfs_user': '',
'hdfs_full_urls': 'invalid',
}

with
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertFalse(self.fs.full_urls)



if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,13 @@ def _add_argparse_args(cls, parser):
default=None,
help=
('HDFS username to use.'))
parser.add_argument(
'--hdfs_full_urls',
default=False,
help=
('If set, URLs will be parsed as "hdfs://server/path/...", instead of'
'"hdfs://path/...". The "server" part will be unused (use --hdfs_host'
'and --hdfs_port).'))

def validate(self, validator):
errors = []
Expand Down

0 comments on commit 342f239

Please sign in to comment.