Skip to content

Commit

Permalink
feat: perform faster ETag verification instead of SHA256 summing
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Apr 16, 2024
1 parent eb8c146 commit 22ac0bc
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.14.0
- feat: support new DCOR API for uploading data directly to S3
- feat: perform faster ETag verification instead of SHA256 summing
- enh: add api.errors.APIBadRequest error class
- setup: bump dclab from 0.57.0 to 0.58.2
0.13.3
Expand Down
2 changes: 1 addition & 1 deletion dcoraid/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .ckan_api import CKANAPI
from .dataset import (dataset_create, dataset_activate, dataset_draft_remove,
dataset_draft_remove_all)
from .dataset import resource_add, resource_exists, resource_sha256_sums
from .dataset import resource_add, resource_exists
from .errors import (APIError, APIBadGatewayError, APIConflictError,
APIAuthorizationError, NoAPIKeyError, APINotFoundError,
APIGatewayTimeoutError, APIOutdatedError)
35 changes: 21 additions & 14 deletions dcoraid/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,27 +193,35 @@ def resource_add(dataset_id, path, api, resource_name=None,
-------
srv_time: float
Total time the server (nginx+uwsgi) needed to process the upload
etag: str
ETag of the resource on S3; returns None if the upload was not
performed via S3
See Also
--------
dcoraid.upload.queue.UploadJob
An implementation of an upload job that monitors progress.
"""
logger = logging.getLogger(__name__ + ".resource_add")
etag = None
if resource_dict is None:
resource_dict = {}
path = pathlib.Path(path)
if resource_name is None:
resource_name = path.name
srv_time = 0 # total time waited for the server to process the upload
# The ETag is computed locally only while uploading to S3, so in any other
# case (e.g. the resource already exists), no ETag is computed. We
# normally use the ETag to verify the upload, for which in this case
# we have to use the SHA256 hash of the local file.
if not exist_ok or not resource_exists(dataset_id=dataset_id,
resource_name=resource_name,
api=api):

# Perform the upload
try:
# Uploading directly to S3 is the preferred option.
resource_add_upload_direct_s3(
etag = resource_add_upload_direct_s3(
api=api,
resource_path=path,
dataset_id=dataset_id,
Expand All @@ -224,7 +232,7 @@ def resource_add(dataset_id, path, api, resource_name=None,
)
except NoS3UploadAvailableError as e:
# This is the fall-back option that causes a performance hit
# on the DCOR server and will unsupported in the future.
# on the DCOR server and will be unsupported in the future.
logger.warning(str(e))
resource_add_upload_legacy_indirect_ckan(
api=api,
Expand All @@ -250,7 +258,7 @@ def resource_add(dataset_id, path, api, resource_name=None,
f"update__resources__{res_dict['id']}": resource_dict}
api.post("package_revise", revise_dict)

return srv_time
return srv_time, etag


def resource_add_upload_direct_s3(
Expand All @@ -260,7 +268,7 @@ def resource_add_upload_direct_s3(
resource_name: str,
monitor_callback: Callable = None,
logger: logging.Logger = None,
timeout: float = 27.9):
timeout: float = 27.9) -> str:
"""Direct resource upload to S3
This is an efficient method for uploading resources. The files are
Expand All @@ -283,6 +291,13 @@ def resource_add_upload_direct_s3(
logger instance
timeout: float
timeout for the requests.post command for the upload
Returns
-------
etag: str
ETag computed for the resource during upload (can be compared
to the ETag that DCOR stores in the "etag" resource property to
verify an upload)
"""
upload_id = f"{dataset_id}/{resource_name}"
if logger is not None:
Expand All @@ -308,7 +323,7 @@ def resource_add_upload_direct_s3(
f"We have {len(upload_info['upload_urls'])} upload parts for "
f"a resource of size {file_size/1024**2:.2f} MiB of {upload_id}")

upload_s3_presigned(
etag = upload_s3_presigned(
path=resource_path,
upload_urls=upload_info["upload_urls"],
complete_url=upload_info["complete_url"],
Expand All @@ -327,6 +342,7 @@ def resource_add_upload_direct_s3(
]
}
api.post("package_revise", revise_dict)
return etag


def resource_add_upload_legacy_indirect_ckan(
Expand Down Expand Up @@ -430,12 +446,3 @@ def resource_exists(dataset_id, resource_name, api, resource_dict=None):
return True
else:
return False


def resource_sha256_sums(dataset_id, api):
"""Return a dictionary of resources with the SHA256 sums as values"""
pkg_dict = api.get("package_show", id=dataset_id)
sha256dict = {}
for resource in pkg_dict.get("resources", []):
sha256dict[resource["name"]] = resource.get("sha256", None)
return sha256dict
6 changes: 3 additions & 3 deletions dcoraid/api/s3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def upload_s3_presigned(
retries: int = 3,
timeout: float = 27.3,
callback: callable = None,
):
) -> str:
"""Upload data to an S3 bucket using presigned URLS
For user convenience, this method performs some sanity checks
Expand Down Expand Up @@ -508,7 +508,7 @@ def upload_s3_presigned_multipart(
continue
else:
etag_full = get_etag_from_response(resp_compl)
if etag_full is not None:
if etag_full: # should not be None or an empty string
if etag_full == etag_expected:
# This is the ideal case. Everything is good.
break
Expand Down Expand Up @@ -556,7 +556,7 @@ def upload_s3_presigned_single(
retries: int = 3,
timeout: float = 27.3,
callback: callable = None
):
) -> str:
"""Upload a single file using a PUT request to a presigned URL
The returned ETag is checked against the MD5 sum of the file.
Expand Down
81 changes: 56 additions & 25 deletions dcoraid/upload/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import logging
import tempfile
import pathlib
import shutil
Expand All @@ -8,11 +9,12 @@
from dclab.rtdc_dataset.check import IntegrityChecker
from dclab.cli import compress

from ..api import (dataset_activate, resource_add, resource_exists,
resource_sha256_sums)
from ..api import dataset_activate, resource_add, resource_exists
from ..common import sha256sum


logger = logging.getLogger(__name__)

#: Valid job states (in more or less chronological order)
JOB_STATES = [
"init", # initial
Expand Down Expand Up @@ -95,6 +97,8 @@ def __init__(self, api, dataset_id, resource_paths,
self.file_sizes = [pathlib.Path(ff).stat().st_size
for ff in self.paths]
self.file_bytes_uploaded = [0] * len(self.paths)
#: ETags for the files uploaded by this UploadJob instance
self.etags = [None] * len(self.paths)
self.index = 0
self.start_time = None
self.end_time = None
Expand Down Expand Up @@ -305,7 +309,9 @@ def set_state(self, state):
"""
if state not in JOB_STATES:
raise ValueError("Unknown state: '{}'".format(state))
self.state = state
if state != self.state:
logger.info(f"New state: {state}")
self.state = state

def task_compress_resources(self):
"""Compress resources if they are not fully compressed
Expand Down Expand Up @@ -427,14 +433,15 @@ def task_upload_resources(self):
continue
else:
# Normal upload.
srv_time = resource_add(
srv_time, etag = resource_add(
dataset_id=self.dataset_id,
path=path,
resource_name=resource_name,
resource_dict=resource_supplement,
api=self.api,
exist_ok=True,
monitor_callback=self.monitor_callback)
self.etags[ii] = etag
self.paths_uploaded.append(path)
self.wait_time += srv_time
self.end_time = time.perf_counter()
Expand All @@ -445,44 +452,68 @@ def task_upload_resources(self):
+ "'{}'!".format(self.state))

def task_verify_resources(self):
"""Perform SHA256 verification"""
"""Perform ETag or SHA256 verification"""
if self.state == "online":
# Make sure that all SHA256 sums are already available online.
for ii in range(500):
sha256dict = resource_sha256_sums(
dataset_id=self.dataset_id,
api=self.api)
missing = [n for n in sha256dict if sha256dict[n] is None]
if missing:
# A resource can either be verified via its SHA256 sum or via
# the ETag that is computed in the case of an S3 upload.
# For every path in self.paths, this list tracks all files that
# have been verified.
verifiable_files = [False] * len(self.paths)
verified_files = [False] * len(self.paths)
sha256_dcor = [None] * len(self.paths)
for _ in range(500):
ds_dict = self.api.get("package_show", id=self.dataset_id)
resources = ds_dict.get("resources", [])
if len(resources) == len(verifiable_files):
for ii, res_dict in enumerate(resources):
if (self.etags[ii] is not None
and self.etags[ii] == res_dict.get("etag")):
verifiable_files[ii] = True
verified_files[ii] = True
if sha256 := res_dict.get("sha256"):
sha256_dcor[ii] = sha256
verifiable_files[ii] = True

if not all(verifiable_files):
self.set_state("wait-dcor")
time.sleep(1)
continue
else:
# all SHA256 sums are available
# ETags or SHA256 sums are available
break
else:
# things are taking too long
self.set_state("error")
msg_parts = ["SHA256 sums not computed by DCOR:"]
msg_parts += [f" - {name}" for name in missing]
msg_parts = ["ETags or SHA256 sums not populated by DCOR:"]
for ii, res_dict in enumerate(resources):
if not verified_files[ii]:
msg_parts += [f" - {res_dict['name']}"]
self.traceback = "\n".join(msg_parts)

# Only start verification if all SHA256 sums are available.
if not missing:
if all(verifiable_files):
self.set_state("verify")
bad_sha256 = []
bad_checksums = []
for ii, path in enumerate(self.paths):
resource_name = self.resource_names[ii]
# compute SHA256 sum
sha = sha256sum(path)
if sha != sha256dict[resource_name]:
bad_sha256.append(
[resource_name, sha, sha256dict[resource_name]])
if bad_sha256:
if verified_files[ii]:
# verified using ETag
logger.info(f"ETag verified for {path}")
continue
# must verify with SHA256 sum
sha256_path = sha256sum(path)
verified_files[ii] = sha256_dcor[ii] == sha256_path
if verified_files[ii]:
logger.info(f"SHA256 verified for {path}")
else:
bad_checksums.append(
[self.paths[ii], sha256_path, sha256_dcor[ii]])
logger.error(f"SHA256 verification failed for {path}")

if bad_checksums:
# we have bad resources, tell the user
self.set_state("error")
msg_parts = ["SHA256 sum failed for resources:"]
for item in bad_sha256:
for item in bad_checksums:
msg_parts.append("'{}' ({} vs. {})!".format(*item))
self.traceback = "\n".join(msg_parts)
else:
Expand Down

0 comments on commit 22ac0bc

Please sign in to comment.