From 5495c15f59ca179423585992d800ac9591a4aba8 Mon Sep 17 00:00:00 2001 From: Andrzej Ressel Date: Wed, 17 Apr 2024 00:21:43 +0200 Subject: [PATCH] Implement fileOffset in readFile --- zio-ftp/src/main/scala/zio/ftp/FtpAccessors.scala | 3 ++- zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala | 4 ++-- zio-ftp/src/main/scala/zio/ftp/TestFtp.scala | 3 ++- zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala | 7 +++++-- zio-ftp/src/main/scala/zio/ftp/package.scala | 12 ++++++------ zio-ftp/src/test/scala/zio/ftp/SecureFtpSpec.scala | 5 +++++ zio-ftp/src/test/scala/zio/ftp/StubFtpSpec.scala | 5 +++++ zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala | 5 +++++ 8 files changed, 32 insertions(+), 12 deletions(-) diff --git a/zio-ftp/src/main/scala/zio/ftp/FtpAccessors.scala b/zio-ftp/src/main/scala/zio/ftp/FtpAccessors.scala index 3b012c15..c377bad3 100644 --- a/zio-ftp/src/main/scala/zio/ftp/FtpAccessors.scala +++ b/zio-ftp/src/main/scala/zio/ftp/FtpAccessors.scala @@ -27,8 +27,9 @@ trait FtpAccessors[+A] { * * @param path absolute path of a file * @param chunkSize default chunk size is 2048 bytes + * @param fileOffset optional initial offset in bytes */ - def readFile(path: String, chunkSize: Int = 2048): ZStream[Any, IOException, Byte] + def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[Any, IOException, Byte] /** * Delete a file on a server. If the operation failed, an error will be emitted diff --git a/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala b/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala index 115ec4ce..fa2c7681 100644 --- a/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala +++ b/zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala @@ -41,13 +41,13 @@ final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] = execute(c => Option(c.statExistence(path)).map(FtpResource(path, _))) - def readFile(path: String, chunkSize: Int): ZStream[Any, IOException, Byte] = + def readFile(path: String, chunkSize: Int, fileOffset: Long): ZStream[Any, IOException, Byte] = for { remoteFile <- ZStream.fromZIO( execute(_.open(path, util.EnumSet.of(OpenMode.READ))) ) - is: java.io.InputStream = new remoteFile.ReadAheadRemoteFileInputStream(64) { + is: java.io.InputStream = new remoteFile.ReadAheadRemoteFileInputStream(64, fileOffset) { override def close(): Unit = try super.close() diff --git a/zio-ftp/src/main/scala/zio/ftp/TestFtp.scala b/zio-ftp/src/main/scala/zio/ftp/TestFtp.scala index 2bde1dfe..7aa5ea92 100644 --- a/zio-ftp/src/main/scala/zio/ftp/TestFtp.scala +++ b/zio-ftp/src/main/scala/zio/ftp/TestFtp.scala @@ -42,7 +42,7 @@ object TestFtp { .refineToOrDie[IOException] } - override def readFile(path: String, chunkSize: Int): ZStream[Any, IOException, Byte] = + override def readFile(path: String, chunkSize: Int, fileOffset: Long): ZStream[Any, IOException, Byte] = ZStream .fromZIO(Files.readAllBytes(root / ZPath(path).elements.mkString("/"))) .catchAll { @@ -50,6 +50,7 @@ object TestFtp { case err => ZStream.fail(err) } .flatMap(ZStream.fromChunk(_)) + .drop(fileOffset.toInt) override def rm(path: String): ZIO[Any, IOException, Unit] = Files diff --git a/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala b/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala index f641cf37..f7c045d1 100644 --- a/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala +++ b/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala @@ -33,7 +33,9 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] = execute(c => Option(c.mlistFile(path))).map(_.map(FtpResource.fromFtpFile(_))) - def readFile(path: String, chunkSize: Int = 2048): ZStream[Any, IOException, Byte] = { + def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = { + val initialize = execute(_.setRestartOffset(fileOffset)) + val terminate = ZIO .fail( FileTransferIncompleteError(s"Cannot finalize the file transfer and completely read the entire file $path.") @@ -43,7 +45,8 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien val inputStream = execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path")) - ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ ZStream.fromZIO(terminate) *> ZStream.empty + (ZStream.fromZIO(initialize) *> ZStream.empty) ++ ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ (ZStream + .fromZIO(terminate) *> ZStream.empty) } def rm(path: String): ZIO[Any, IOException, Unit] = diff --git a/zio-ftp/src/main/scala/zio/ftp/package.scala b/zio-ftp/src/main/scala/zio/ftp/package.scala index dbe23f07..298b627e 100644 --- a/zio-ftp/src/main/scala/zio/ftp/package.scala +++ b/zio-ftp/src/main/scala/zio/ftp/package.scala @@ -61,8 +61,8 @@ package object ftp { _ <- ftp.upload(path, source) } yield () - def readFile(path: String, chunkSize: Int = 2048): ZStream[Ftp, IOException, Byte] = - ZStream.serviceWithStream(_.readFile(path, chunkSize)) + def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[Ftp, IOException, Byte] = + ZStream.serviceWithStream(_.readFile(path, chunkSize, fileOffset)) def rename(oldPath: String, newPath: String): ZIO[Ftp, Exception, Unit] = ZIO.serviceWithZIO(_.rename(oldPath, newPath)) @@ -100,8 +100,8 @@ package object ftp { _ <- ftp.upload(path, source) } yield () - def readFile(path: String, chunkSize: Int = 2048): ZStream[SFtp, IOException, Byte] = - ZStream.serviceWithStream(_.readFile(path, chunkSize)) + def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[SFtp, IOException, Byte] = + ZStream.serviceWithStream(_.readFile(path, chunkSize, fileOffset)) def rename(oldPath: String, newPath: String): ZIO[SFtp, Exception, Unit] = ZIO.serviceWithZIO(_.rename(oldPath, newPath)) @@ -139,8 +139,8 @@ package object ftp { _ <- ftp.upload(path, source) } yield () - def readFile(path: String, chunkSize: Int = 2048): ZStream[StubFtp, IOException, Byte] = - ZStream.serviceWithStream(_.readFile(path, chunkSize)) + def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[StubFtp, IOException, Byte] = + ZStream.serviceWithStream(_.readFile(path, chunkSize, fileOffset)) def rename(oldPath: String, newPath: String): ZIO[StubFtp, Exception, Unit] = ZIO.serviceWithZIO(_.rename(oldPath, newPath)) diff --git a/zio-ftp/src/test/scala/zio/ftp/SecureFtpSpec.scala b/zio-ftp/src/test/scala/zio/ftp/SecureFtpSpec.scala index b63d37a3..4adb7c48 100644 --- a/zio-ftp/src/test/scala/zio/ftp/SecureFtpSpec.scala +++ b/zio-ftp/src/test/scala/zio/ftp/SecureFtpSpec.scala @@ -122,6 +122,11 @@ object SecureFtpSpec extends ZIOSpecDefault { |this is a beautiful day""".stripMargin ) }, + test("readFile with offset") { + for { + content <- readFile("/notes.txt", fileOffset = 16).via(utf8Decode).runCollect + } yield assert(content.mkString)(equalTo("this is a beautiful day")) + }, test("readFile does not exist") { for { invalid <- readFile("/invalid.txt") diff --git a/zio-ftp/src/test/scala/zio/ftp/StubFtpSpec.scala b/zio-ftp/src/test/scala/zio/ftp/StubFtpSpec.scala index 94455547..5683b45e 100644 --- a/zio-ftp/src/test/scala/zio/ftp/StubFtpSpec.scala +++ b/zio-ftp/src/test/scala/zio/ftp/StubFtpSpec.scala @@ -67,6 +67,11 @@ object StubFtpSpec extends ZIOSpecDefault { } yield assert(content.mkString)(equalTo("""|Hello world !!! |this is a beautiful day""".stripMargin)) }, + test("readFile with offset") { + for { + content <- readFile("/notes.txt", fileOffset = 16).via(utf8Decode).runCollect + } yield assert(content.mkString)(equalTo("this is a beautiful day")) + }, test("readFile does not exist") { for { invalid <- readFile("/invalid.txt") diff --git a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala index 779c027c..11c948c7 100644 --- a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala +++ b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala @@ -106,6 +106,11 @@ object FtpSuite { } yield assert(content.mkString)(equalTo("""|Hello world !!! |this is a beautiful day""".stripMargin)) }, + test("readFile with offset") { + for { + content <- readFile("/notes.txt", fileOffset = 16).via(utf8Decode).runCollect + } yield assert(content.mkString)(equalTo("this is a beautiful day")) + }, test("readFile does not exist") { for { invalid <- readFile("/invalid.txt")