Skip to content

Commit

Permalink
Implement tus uploads in Pulsar client.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 10, 2024
1 parent 5d09683 commit 610408c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
32 changes: 32 additions & 0 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
scp_get_file,
scp_post_file,
)
from .transport.tus import (
tus_upload_file,
)
from .util import (
copy_to_path,
directory_files,
Expand Down Expand Up @@ -482,6 +485,34 @@ def write_from_path(self, pulsar_path):
post_file(self.url, pulsar_path)


class RemoteTransferTusAction(BaseAction):
""" This action indicates the Pulsar server should transfer the file before
execution via one of the remote transfer implementations. This is like a TransferAction, but
it indicates the action requires network access to the staging server and TUS
will be used for the transfer
"""
inject_url = True
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
self.url = url

def to_dict(self):
return self._extend_base_dict(url=self.url)

@classmethod
def from_dict(cls, action_dict):
return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"])

def write_to_path(self, path):
get_file(self.url, path)

def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -817,6 +848,7 @@ def unstructured_map(self, path):
CopyAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
RemoteObjectStoreCopyAction,
RsyncTransferAction,
ScpTransferAction,
Expand Down
7 changes: 7 additions & 0 deletions test/client_transport_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pulsar.client.transport.curl import PycurlTransport
from pulsar.client.transport.curl import post_file
from pulsar.client.transport.curl import get_file
from pulsar.client.transport.tus import find_tus_endpoint
from pulsar.client.transport import get_transport

from .test_utils import files_server
Expand Down Expand Up @@ -107,6 +108,12 @@ def test_curl_problems():
assert exception_raised


def test_find_tus_endpoint():
galaxy_endpoint = "http://subdomain.galaxy.org/prefix/api/jobs/1231sdfsq23e/files?job_key=34"
tus_endpoint = find_tus_endpoint(galaxy_endpoint)
assert tus_endpoint == "http://subdomain.galaxy.org/prefix/api/job_files/resumable_upload?job_key=34"


def test_get_transport():
assert type(get_transport(None, FakeOsModule("1"))) == PycurlTransport
assert type(get_transport(None, FakeOsModule("TRUE"))) == PycurlTransport
Expand Down

0 comments on commit 610408c

Please sign in to comment.