Skip to content

Commit

Permalink
Merge pull request #448 from TeskaLabs/Bug/Multiple-items-library
Browse files Browse the repository at this point in the history
Bug: Multiple items with same name from different  providers
  • Loading branch information
mithunbharadwaj authored Jul 27, 2023
2 parents 47f7ede + 4f3a33c commit 7c76f10
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 51 deletions.
5 changes: 3 additions & 2 deletions asab/library/providers/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
class LibraryProviderABC(object):


def __init__(self, library):
def __init__(self, library, layer):
super().__init__()
self.App = library.App
self.Library = library
self.Layer = layer
self.IsReady = False


Expand All @@ -25,7 +26,7 @@ async def read(self, path: str) -> typing.IO:
raise NotImplementedError("{}.read()".format(self.__class__.__name__))


async def list(self, path: str, index) -> list:
async def list(self, path: str) -> list:
"""
It lists all items in the library at the given path.
Expand Down
6 changes: 3 additions & 3 deletions asab/library/providers/azurestorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ class AzureStorageLibraryProvider(LibraryProviderABC):
'''

def __init__(self, library, path):
super().__init__(library)
def __init__(self, library, path, layer):
super().__init__(library, layer)
assert path[:6] == "azure+"

self.URL = urllib.parse.urlparse(path[6:])
self.Model = None # Will be set by `_load_model` method
self.Path = path
Expand Down Expand Up @@ -147,6 +146,7 @@ async def list(self, path: str) -> list:
items.append(LibraryItem(
name=i.name,
type=i.type,
layer=self.Layer,
providers=[self],
))

Expand Down
12 changes: 6 additions & 6 deletions asab/library/providers/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@

class FileSystemLibraryProvider(LibraryProviderABC):

def __init__(self, library, path, *, set_ready=True):
def __init__(self, library, path, layer, *, set_ready=True):
'''
`set_ready` can be used to disable/defer `self._set_ready` call.
'''

super().__init__(library)
super().__init__(library, layer)
self.BasePath = os.path.abspath(path)
while self.BasePath.endswith("/"):
self.BasePath = self.BasePath[:-1]
Expand Down Expand Up @@ -78,12 +78,12 @@ async def read(self, path: str) -> typing.IO:
return None


async def list(self, path: str, index) -> list:
async def list(self, path: str) -> list:
# This list method is completely synchronous, but it should look like asynchronous to make all list methods unified among providers.
return self._list(path, index)
return self._list(path)


def _list(self, path: str, index: int):
def _list(self, path: str):

node_path = self.BasePath + path

Expand Down Expand Up @@ -121,7 +121,7 @@ def _list(self, path: str, index: int):
items.append(LibraryItem(
name=fname,
type=ftype,
layer=index,
layer=self.Layer,
providers=[self],
))

Expand Down
4 changes: 2 additions & 2 deletions asab/library/providers/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class GitLibraryProvider(FileSystemLibraryProvider):
[library:git]
repodir=<optional location of the repository cache>
"""
def __init__(self, library, path):
def __init__(self, library, path, layer):

# format: 'git+http[s]://[<username>:<deploy token>@]<url>[#<branch>]'
pattern = re.compile(r"git\+(https?://)((.*):(.*)@)?([^#]*)(?:#(.*))?$")
Expand All @@ -57,7 +57,7 @@ def __init__(self, library, path):
hashlib.sha256(path.encode('utf-8')).hexdigest()
)

super().__init__(library, self.RepoPath, set_ready=False)
super().__init__(library, self.RepoPath, layer, set_ready=False)

self.GitRepository = None

Expand Down
9 changes: 4 additions & 5 deletions asab/library/providers/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,15 @@ class ZooKeeperLibraryProvider(LibraryProviderABC):
"""

def __init__(self, library, path):
super().__init__(library)
def __init__(self, library, path, layer):
super().__init__(library, layer)

url_pieces = urllib.parse.urlparse(path)

self.FullPath = url_pieces.scheme + '://'
self.BasePath = url_pieces.path.lstrip("/")
while self.BasePath.endswith("/"):
self.BasePath = self.BasePath[:-1]

self.BasePath = '/' + self.BasePath
if self.BasePath == '/':
self.BasePath = ''
Expand Down Expand Up @@ -250,7 +249,7 @@ async def read(self, path: str) -> typing.IO:
else:
return None

async def list(self, path: str, index) -> list:
async def list(self, path: str) -> list:
if self.Zookeeper is None:
L.warning("Zookeeper Client has not been established (yet). Cannot list {}".format(path))
raise RuntimeError("Zookeeper Client has not been established (yet). Not ready.")
Expand Down Expand Up @@ -279,7 +278,7 @@ async def list(self, path: str, index) -> list:
items.append(LibraryItem(
name=fname,
type=ftype,
layer=index,
layer=self.Layer,
providers=[self],
))

Expand Down
51 changes: 18 additions & 33 deletions asab/library/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

L = logging.getLogger(__name__)


#


Expand Down Expand Up @@ -75,38 +76,36 @@ def __init__(self, app, service_name, paths=None):
if isinstance(paths, str):
paths = re.split(r"\s+", paths)

for path in paths:
self._create_library(path)
for layer, path in enumerate(paths):
# Create library for each layer of paths
self._create_library(path, layer)
app.PubSub.subscribe("Application.tick/60!", self.on_tick)


async def finalize(self, app):
while len(self.Libraries) > 0:
lib = self.Libraries.pop(-1)
await lib.finalize(self.App)


async def on_tick(self, message_type):
await self._read_disabled()


def _create_library(self, path):
def _create_library(self, path, layer):
library_provider = None
if path.startswith('zk://') or path.startswith('zookeeeper://'):
from .providers.zookeeper import ZooKeeperLibraryProvider
library_provider = ZooKeeperLibraryProvider(self, path)
library_provider = ZooKeeperLibraryProvider(self, path, layer)

elif path.startswith('./') or path.startswith('/') or path.startswith('file://'):
from .providers.filesystem import FileSystemLibraryProvider
library_provider = FileSystemLibraryProvider(self, path)
library_provider = FileSystemLibraryProvider(self, path, layer)

elif path.startswith('azure+https://'):
from .providers.azurestorage import AzureStorageLibraryProvider
library_provider = AzureStorageLibraryProvider(self, path)
library_provider = AzureStorageLibraryProvider(self, path, layer)

elif path.startswith('git+'):
from .providers.git import GitLibraryProvider
library_provider = GitLibraryProvider(self, path)
library_provider = GitLibraryProvider(self, path, layer)

elif path == '' or path.startswith("#") or path.startswith(";"):
# This is empty or commented line
Expand All @@ -118,7 +117,6 @@ def _create_library(self, path):

self.Libraries.append(library_provider)


def is_ready(self):
"""
It checks if all the libraries are ready.
Expand All @@ -134,7 +132,6 @@ def is_ready(self):
True
)


async def _set_ready(self, provider):
if len(self.Libraries) == 0:
return
Expand All @@ -149,8 +146,6 @@ async def _set_ready(self, provider):
L.log(LOG_NOTICE, "is NOT ready.", struct_data={'name': self.Name})
self.App.PubSub.publish("Library.not_ready!", self)



async def read(self, path: str, tenant: str = None) -> typing.IO:
"""
Read the content of the library item specified by `path`.
Expand Down Expand Up @@ -188,7 +183,6 @@ async def read(self, path: str, tenant: str = None) -> typing.IO:

return None


async def list(self, path="/", tenant=None, recursive=False) -> list:
"""
List the directory of the library specified by the path.
Expand Down Expand Up @@ -236,17 +230,15 @@ async def list(self, path="/", tenant=None, recursive=False) -> list:
child_items = await self._list(item.name, tenant, providers=item.providers)
items.extend(child_items)
recitems.extend(child_items)

return items


async def _list(self, path, tenant, providers):
# Execute the list query in all providers in-parallel

result = await asyncio.gather(*[
library.list(path, index)
for index, library in enumerate(providers)
library.list(path)
for library in providers
], return_exceptions=True)

items = []
uniq = dict()
for ress in result:
Expand All @@ -260,33 +252,28 @@ async def _list(self, path, tenant, providers):
continue

for item in ress:

item.disabled = self.check_disabled(item.name, tenant=tenant)

# If the item already exists, merge it
# If the item already exists, merge or override it
pitem = uniq.get(item.name)
if pitem is not None:
pitem = uniq[item.name]
if pitem.type == 'dir' and item.type == 'dir':
# Directories are joined
pitem.providers.extend(item.providers)

elif pitem.type == 'item':
for i, provider in enumerate(providers):
if provider in item.providers:
index = i
break
pitem.override = index
# Other item types are skipped
else:
continue

uniq[item.name] = item
items.append(item)

# Other item types are skipped
else:
uniq[item.name] = item
items.append(item)
items.sort(key=lambda x: x.name)
return items


async def _read_disabled(self):
# `.disabled.yaml` is read from the first configured library
# It is applied on all libraries in the configuration.
Expand All @@ -305,7 +292,6 @@ async def _read_disabled(self):
self.Disabled = {}
L.exception("Failed to parse '/.disabled.yaml'")


def check_disabled(self, path, tenant=None):
"""
If the item is disabled for everybody, or if the item is disabled for the specified tenant, then
Expand All @@ -330,7 +316,6 @@ def check_disabled(self, path, tenant=None):

return False


async def export(self, path="/", tenant=None, remove_path=False) -> typing.IO:
"""
It takes a path, and returns a file-like object containing a gzipped tar archive of the library contents of
Expand Down

0 comments on commit 7c76f10

Please sign in to comment.