Skip to content

Commit

Permalink
Implement fileOffset in readFile
Browse files Browse the repository at this point in the history
  • Loading branch information
andrzejressel committed Apr 16, 2024
1 parent 91350dd commit 5495c15
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 12 deletions.
3 changes: 2 additions & 1 deletion zio-ftp/src/main/scala/zio/ftp/FtpAccessors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ trait FtpAccessors[+A] {
*
* @param path absolute path of a file
* @param chunkSize default chunk size is 2048 bytes
* @param fileOffset optional initial offset in bytes
*/
def readFile(path: String, chunkSize: Int = 2048): ZStream[Any, IOException, Byte]
def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[Any, IOException, Byte]

/**
* Delete a file on a server. If the operation failed, an error will be emitted
Expand Down
4 changes: 2 additions & 2 deletions zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client]
def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] =
execute(c => Option(c.statExistence(path)).map(FtpResource(path, _)))

def readFile(path: String, chunkSize: Int): ZStream[Any, IOException, Byte] =
def readFile(path: String, chunkSize: Int, fileOffset: Long): ZStream[Any, IOException, Byte] =
for {
remoteFile <- ZStream.fromZIO(
execute(_.open(path, util.EnumSet.of(OpenMode.READ)))
)

is: java.io.InputStream = new remoteFile.ReadAheadRemoteFileInputStream(64) {
is: java.io.InputStream = new remoteFile.ReadAheadRemoteFileInputStream(64, fileOffset) {

override def close(): Unit =
try super.close()
Expand Down
3 changes: 2 additions & 1 deletion zio-ftp/src/main/scala/zio/ftp/TestFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ object TestFtp {
.refineToOrDie[IOException]
}

override def readFile(path: String, chunkSize: Int): ZStream[Any, IOException, Byte] =
override def readFile(path: String, chunkSize: Int, fileOffset: Long): ZStream[Any, IOException, Byte] =
ZStream
.fromZIO(Files.readAllBytes(root / ZPath(path).elements.mkString("/")))
.catchAll {
case _: NoSuchFileException => ZStream.fail(InvalidPathError(s"File does not exist $path"))
case err => ZStream.fail(err)
}
.flatMap(ZStream.fromChunk(_))
.drop(fileOffset.toInt)

override def rm(path: String): ZIO[Any, IOException, Unit] =
Files
Expand Down
7 changes: 5 additions & 2 deletions zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien
def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] =
execute(c => Option(c.mlistFile(path))).map(_.map(FtpResource.fromFtpFile(_)))

def readFile(path: String, chunkSize: Int = 2048): ZStream[Any, IOException, Byte] = {
def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = {
val initialize = execute(_.setRestartOffset(fileOffset))

val terminate = ZIO
.fail(
FileTransferIncompleteError(s"Cannot finalize the file transfer and completely read the entire file $path.")
Expand All @@ -43,7 +45,8 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien
val inputStream =
execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path"))

ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ ZStream.fromZIO(terminate) *> ZStream.empty
(ZStream.fromZIO(initialize) *> ZStream.empty) ++ ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ (ZStream
.fromZIO(terminate) *> ZStream.empty)
}

def rm(path: String): ZIO[Any, IOException, Unit] =
Expand Down
12 changes: 6 additions & 6 deletions zio-ftp/src/main/scala/zio/ftp/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ package object ftp {
_ <- ftp.upload(path, source)
} yield ()

def readFile(path: String, chunkSize: Int = 2048): ZStream[Ftp, IOException, Byte] =
ZStream.serviceWithStream(_.readFile(path, chunkSize))
def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[Ftp, IOException, Byte] =
ZStream.serviceWithStream(_.readFile(path, chunkSize, fileOffset))

def rename(oldPath: String, newPath: String): ZIO[Ftp, Exception, Unit] =
ZIO.serviceWithZIO(_.rename(oldPath, newPath))
Expand Down Expand Up @@ -100,8 +100,8 @@ package object ftp {
_ <- ftp.upload(path, source)
} yield ()

def readFile(path: String, chunkSize: Int = 2048): ZStream[SFtp, IOException, Byte] =
ZStream.serviceWithStream(_.readFile(path, chunkSize))
def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[SFtp, IOException, Byte] =
ZStream.serviceWithStream(_.readFile(path, chunkSize, fileOffset))

def rename(oldPath: String, newPath: String): ZIO[SFtp, Exception, Unit] =
ZIO.serviceWithZIO(_.rename(oldPath, newPath))
Expand Down Expand Up @@ -139,8 +139,8 @@ package object ftp {
_ <- ftp.upload(path, source)
} yield ()

def readFile(path: String, chunkSize: Int = 2048): ZStream[StubFtp, IOException, Byte] =
ZStream.serviceWithStream(_.readFile(path, chunkSize))
def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long = 0): ZStream[StubFtp, IOException, Byte] =
ZStream.serviceWithStream(_.readFile(path, chunkSize, fileOffset))

def rename(oldPath: String, newPath: String): ZIO[StubFtp, Exception, Unit] =
ZIO.serviceWithZIO(_.rename(oldPath, newPath))
Expand Down
5 changes: 5 additions & 0 deletions zio-ftp/src/test/scala/zio/ftp/SecureFtpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ object SecureFtpSpec extends ZIOSpecDefault {
|this is a beautiful day""".stripMargin
)
},
test("readFile with offset") {
for {
content <- readFile("/notes.txt", fileOffset = 16).via(utf8Decode).runCollect
} yield assert(content.mkString)(equalTo("this is a beautiful day"))
},
test("readFile does not exist") {
for {
invalid <- readFile("/invalid.txt")
Expand Down
5 changes: 5 additions & 0 deletions zio-ftp/src/test/scala/zio/ftp/StubFtpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ object StubFtpSpec extends ZIOSpecDefault {
} yield assert(content.mkString)(equalTo("""|Hello world !!!
|this is a beautiful day""".stripMargin))
},
test("readFile with offset") {
for {
content <- readFile("/notes.txt", fileOffset = 16).via(utf8Decode).runCollect
} yield assert(content.mkString)(equalTo("this is a beautiful day"))
},
test("readFile does not exist") {
for {
invalid <- readFile("/invalid.txt")
Expand Down
5 changes: 5 additions & 0 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ object FtpSuite {
} yield assert(content.mkString)(equalTo("""|Hello world !!!
|this is a beautiful day""".stripMargin))
},
test("readFile with offset") {
for {
content <- readFile("/notes.txt", fileOffset = 16).via(utf8Decode).runCollect
} yield assert(content.mkString)(equalTo("this is a beautiful day"))
},
test("readFile does not exist") {
for {
invalid <- readFile("/invalid.txt")
Expand Down

0 comments on commit 5495c15

Please sign in to comment.