Skip to content

Commit

Permalink
[BEAM-8399] Add --hdfs_full_urls option (#10223)
Browse files Browse the repository at this point in the history
* [BEAM-8399] Add --hdfs_full_urls option

Fully @parameterized unit tests to use/not use full URLs.
Added integration test with --full_urls.

parameterized min version increased for parameterized_class decorator.

* Add CHANGES.md entry

* Fix test_parse_url to not modify self.fs._full_urls
  • Loading branch information
udim committed Feb 14, 2020
1 parent d9099fb commit c386cc7
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* New highly anticipated feature Y added to JavaSDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).

### I/Os
* Python SDK: Adds support for standard HDFS URLs (with server name). ([#10223](https://github.com/apache/beam/pull/10223)).
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

### New Features / Improvements
Expand Down
85 changes: 54 additions & 31 deletions sdks/python/apache_beam/io/hadoopfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

_HDFS_PREFIX = 'hdfs:/'
_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)')
_FULL_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'/([^/]+)(/.*)*')
_COPY_BUFFER_SIZE = 2**16
_DEFAULT_BUFFER_SIZE = 20 * 1024 * 1024

Expand Down Expand Up @@ -116,42 +117,58 @@ def __init__(self, pipeline_options):
hdfs_host = hdfs_options.hdfs_host
hdfs_port = hdfs_options.hdfs_port
hdfs_user = hdfs_options.hdfs_user
self._full_urls = hdfs_options.hdfs_full_urls
else:
hdfs_host = pipeline_options.get('hdfs_host')
hdfs_port = pipeline_options.get('hdfs_port')
hdfs_user = pipeline_options.get('hdfs_user')
self._full_urls = pipeline_options.get('hdfs_full_urls', False)

if hdfs_host is None:
raise ValueError('hdfs_host is not set')
if hdfs_port is None:
raise ValueError('hdfs_port is not set')
if hdfs_user is None:
raise ValueError('hdfs_user is not set')
if not isinstance(self._full_urls, bool):
raise ValueError(
'hdfs_full_urls should be bool, got: %s', self._full_urls)
self._hdfs_client = hdfs.InsecureClient(
'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)

@classmethod
def scheme(cls):
return 'hdfs'

@staticmethod
def _parse_url(url):
def _parse_url(self, url):
"""Verifies that url begins with hdfs:// prefix, strips it and adds a
leading /.
Raises:
ValueError if url doesn't begin with hdfs://.
Parsing behavior is determined by HadoopFileSystemOptions.hdfs_full_urls.
Args:
url: A URL in the form hdfs://path/...
url: (str) A URL in the form hdfs://path/...
or in the form hdfs://server/path/...
Raises:
ValueError if the URL doesn't match the expect format.
Returns:
For an input of 'hdfs://path/...', will return '/path/...'.
(str, str) If using hdfs_full_urls, for an input of
'hdfs://server/path/...' will return (server, '/path/...').
Otherwise, for an input of 'hdfs://path/...', will return
('', '/path/...').
"""
m = _URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return m.group(1)
if not self._full_urls:
m = _URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return '', m.group(1)
else:
m = _FULL_URL_RE.match(url)
if m is None:
raise ValueError('Could not parse url: %s' % url)
return m.group(1), m.group(2) or '/'

def join(self, base_url, *paths):
"""Join two or more pathname components.
Expand All @@ -164,19 +181,24 @@ def join(self, base_url, *paths):
Returns:
Full url after combining all the passed components.
"""
basepath = self._parse_url(base_url)
return _HDFS_PREFIX + self._join(basepath, *paths)
server, basepath = self._parse_url(base_url)
return _HDFS_PREFIX + self._join(server, basepath, *paths)

def _join(self, basepath, *paths):
return posixpath.join(basepath, *paths)
def _join(self, server, basepath, *paths):
res = posixpath.join(basepath, *paths)
if server:
server = '/' + server
return server + res

def split(self, url):
rel_path = self._parse_url(url)
server, rel_path = self._parse_url(url)
if server:
server = '/' + server
head, tail = posixpath.split(rel_path)
return _HDFS_PREFIX + head, tail
return _HDFS_PREFIX + server + head, tail

def mkdirs(self, url):
path = self._parse_url(url)
_, path = self._parse_url(url)
if self._exists(path):
raise BeamIOError('Path already exists: %s' % path)
return self._mkdirs(path)
Expand All @@ -189,10 +211,10 @@ def has_dirs(self):

def _list(self, url):
try:
path = self._parse_url(url)
server, path = self._parse_url(url)
for res in self._hdfs_client.list(path, status=True):
yield FileMetadata(
_HDFS_PREFIX + self._join(path, res[0]),
_HDFS_PREFIX + self._join(server, path, res[0]),
res[1][_FILE_STATUS_LENGTH])
except Exception as e: # pylint: disable=broad-except
raise BeamIOError('List operation failed', {url: e})
Expand Down Expand Up @@ -222,7 +244,7 @@ def create(
Returns:
A Python File-like object.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._create(path, mime_type, compression_type)

def _create(
Expand All @@ -246,7 +268,7 @@ def open(
Returns:
A Python File-like object.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._open(path, mime_type, compression_type)

def _open(
Expand Down Expand Up @@ -293,7 +315,7 @@ def _copy_path(source, destination):

for path, dirs, files in self._hdfs_client.walk(source):
for dir in dirs:
new_dir = self._join(destination, dir)
new_dir = self._join('', destination, dir)
if not self._exists(new_dir):
self._mkdirs(new_dir)

Expand All @@ -302,13 +324,14 @@ def _copy_path(source, destination):
rel_path = ''
for file in files:
_copy_file(
self._join(path, file), self._join(destination, rel_path, file))
self._join('', path, file),
self._join('', destination, rel_path, file))

exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
_copy_path(rel_source, rel_destination)
except Exception as e: # pylint: disable=broad-except
exceptions[(source, destination)] = e
Expand All @@ -320,8 +343,8 @@ def rename(self, source_file_names, destination_file_names):
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
try:
rel_source = self._parse_url(source)
rel_destination = self._parse_url(destination)
_, rel_source = self._parse_url(source)
_, rel_destination = self._parse_url(destination)
try:
self._hdfs_client.rename(rel_source, rel_destination)
except hdfs.HdfsError as e:
Expand All @@ -344,7 +367,7 @@ def exists(self, url):
Returns:
True if url exists as a file or directory in HDFS.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
return self._exists(path)

def _exists(self, path):
Expand All @@ -356,7 +379,7 @@ def _exists(self, path):
return self._hdfs_client.status(path, strict=False) is not None

def size(self, url):
path = self._parse_url(url)
_, path = self._parse_url(url)
status = self._hdfs_client.status(path, strict=False)
if status is None:
raise BeamIOError('File not found: %s' % url)
Expand All @@ -371,7 +394,7 @@ def checksum(self, url):
Returns:
String describing the checksum.
"""
path = self._parse_url(url)
_, path = self._parse_url(url)
file_checksum = self._hdfs_client.checksum(path)
return '%s-%d-%s' % (
file_checksum[_FILE_CHECKSUM_ALGORITHM],
Expand All @@ -383,7 +406,7 @@ def delete(self, urls):
exceptions = {}
for url in urls:
try:
path = self._parse_url(url)
_, path = self._parse_url(url)
self._hdfs_client.delete(path, recursive=True)
except Exception as e: # pylint: disable=broad-except
exceptions[url] = e
Expand Down
88 changes: 75 additions & 13 deletions sdks/python/apache_beam/io/hadoopfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# patches unittest.TestCase to be python3 compatible
import future.tests.base # pylint: disable=unused-import
from future.utils import itervalues
from parameterized import parameterized_class

from apache_beam.io import hadoopfilesystem as hdfs
from apache_beam.io.filesystem import BeamIOError
Expand Down Expand Up @@ -203,6 +204,7 @@ def checksum(self, path):
return f.get_file_checksum()


@parameterized_class(('full_urls', ), [(False, ), (True, )])
class HadoopFileSystemTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand All @@ -220,7 +222,11 @@ def setUp(self):
hdfs_options.hdfs_user = ''

self.fs = hdfs.HadoopFileSystem(pipeline_options)
self.tmpdir = 'hdfs://test_dir'
self.fs._full_urls = self.full_urls
if self.full_urls:
self.tmpdir = 'hdfs://test_dir'
else:
self.tmpdir = 'hdfs://server/test_dir'

for filename in ['old_file1', 'old_file2']:
url = self.fs.join(self.tmpdir, filename)
Expand All @@ -230,22 +236,63 @@ def test_scheme(self):
self.assertEqual(self.fs.scheme(), 'hdfs')
self.assertEqual(hdfs.HadoopFileSystem.scheme(), 'hdfs')

def test_parse_url(self):
cases = [
('hdfs://', ('', '/'), False),
('hdfs://', None, True),
('hdfs://a', ('', '/a'), False),
('hdfs://a', ('a', '/'), True),
('hdfs://a/', ('', '/a/'), False),
('hdfs://a/', ('a', '/'), True),
('hdfs://a/b', ('', '/a/b'), False),
('hdfs://a/b', ('a', '/b'), True),
('hdfs://a/b/', ('', '/a/b/'), False),
('hdfs://a/b/', ('a', '/b/'), True),
('hdfs:/a/b', None, False),
('hdfs:/a/b', None, True),
('invalid', None, False),
('invalid', None, True),
]
for url, expected, full_urls in cases:
if self.full_urls != full_urls:
continue
try:
result = self.fs._parse_url(url)
except ValueError:
self.assertIsNone(expected, msg=(url, expected, full_urls))
continue
self.assertEqual(expected, result, msg=(url, expected, full_urls))

def test_url_join(self):
self.assertEqual(
'hdfs://tmp/path/to/file',
self.fs.join('hdfs://tmp/path', 'to', 'file'))
self.assertEqual(
'hdfs://tmp/path/to/file', self.fs.join('hdfs://tmp/path', 'to/file'))
self.assertEqual('hdfs://tmp/path/', self.fs.join('hdfs://tmp/path/', ''))
self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo', '/bar'))
with self.assertRaises(ValueError):
self.fs.join('/no/scheme', 'file')

if not self.full_urls:
self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo', '/bar'))
self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo/', '/bar'))
with self.assertRaises(ValueError):
self.fs.join('/no/scheme', 'file')
else:
self.assertEqual('hdfs://foo/bar', self.fs.join('hdfs://foo', '/bar'))
self.assertEqual('hdfs://foo/bar', self.fs.join('hdfs://foo/', '/bar'))

def test_url_split(self):
self.assertEqual(('hdfs://tmp/path/to', 'file'),
self.fs.split('hdfs://tmp/path/to/file'))
self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp'))
self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/'))
if not self.full_urls:
self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp'))
self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/'))
self.assertEqual(('hdfs://tmp', 'a'), self.fs.split('hdfs://tmp/a'))
else:
self.assertEqual(('hdfs://tmp/', ''), self.fs.split('hdfs://tmp'))
self.assertEqual(('hdfs://tmp/', ''), self.fs.split('hdfs://tmp/'))
self.assertEqual(('hdfs://tmp/', 'a'), self.fs.split('hdfs://tmp/a'))

self.assertEqual(('hdfs://tmp/a', ''), self.fs.split('hdfs://tmp/a/'))
with self.assertRaisesRegex(ValueError, r'parse'):
self.fs.split('tmp')

Expand Down Expand Up @@ -329,7 +376,7 @@ def test_create_success(self):
url = self.fs.join(self.tmpdir, 'new_file')
handle = self.fs.create(url)
self.assertIsNotNone(handle)
url = self.fs._parse_url(url)
_, url = self.fs._parse_url(url)
expected_file = FakeFile(url, 'wb')
self.assertEqual(self._fake_hdfs.files[url], expected_file)

Expand All @@ -338,7 +385,7 @@ def test_create_write_read_compressed(self):

handle = self.fs.create(url)
self.assertIsNotNone(handle)
path = self.fs._parse_url(url)
_, path = self.fs._parse_url(url)
expected_file = FakeFile(path, 'wb')
self.assertEqual(self._fake_hdfs.files[path], expected_file)
data = b'abc' * 10
Expand Down Expand Up @@ -535,7 +582,7 @@ def test_delete_error(self):
url2 = self.fs.join(self.tmpdir, 'old_file1')

self.assertTrue(self.fs.exists(url2))
path1 = self.fs._parse_url(url1)
_, path1 = self.fs._parse_url(url1)
with self.assertRaisesRegex(BeamIOError,
r'^Delete operation failed .* %s' % path1):
self.fs.delete([url1, url2])
Expand All @@ -545,21 +592,21 @@ def test_delete_error(self):
class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase):
"""Tests pipeline_options, in the form of a
RuntimeValueProvider.runtime_options object."""
def test_dict_options(self):
def setUp(self):
self._fake_hdfs = FakeHdfs()
hdfs.hdfs.InsecureClient = (lambda *args, **kwargs: self._fake_hdfs)

def test_dict_options(self):
pipeline_options = {
'hdfs_host': '',
'hdfs_port': 0,
'hdfs_user': '',
}

self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertFalse(self.fs._full_urls)

def test_dict_options_missing(self):
self._fake_hdfs = FakeHdfs()
hdfs.hdfs.InsecureClient = (lambda *args, **kwargs: self._fake_hdfs)

with self.assertRaisesRegex(ValueError, r'hdfs_host'):
self.fs = hdfs.HadoopFileSystem(
pipeline_options={
Expand All @@ -581,6 +628,21 @@ def test_dict_options_missing(self):
'hdfs_port': 0,
})

def test_dict_options_full_urls(self):
pipeline_options = {
'hdfs_host': '',
'hdfs_port': 0,
'hdfs_user': '',
'hdfs_full_urls': 'invalid',
}

with self.assertRaisesRegex(ValueError, r'hdfs_full_urls'):
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)

pipeline_options['hdfs_full_urls'] = True
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
self.assertTrue(self.fs._full_urls)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading

0 comments on commit c386cc7

Please sign in to comment.