Skip to content

Commit

Permalink
[PATCH] Amended Forex Service and Added Refresh Task
Browse files Browse the repository at this point in the history
  • Loading branch information
vijay-kumar-ponraj committed Nov 18, 2024
1 parent d689678 commit 81de7fe
Show file tree
Hide file tree
Showing 22 changed files with 320 additions and 17 deletions.
7 changes: 7 additions & 0 deletions forex-mtl/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,10 @@ libraryDependencies ++= Seq(
Libraries.scalaCheck % Test,
Libraries.catsScalaCheck % Test
)

libraryDependencies ++= Seq(
"net.debasishg" %% "redisclient" % "3.41"
)

libraryDependencies += "com.lihaoyi" %% "requests" % "0.8.0"

5 changes: 4 additions & 1 deletion forex-mtl/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
app {
http {
host = "0.0.0.0"
port = 8080
port = 8081
timeout = 40 seconds
}
oneframe {
token = "10dc303535874aeccc86a8251e6992f5"
}
}

12 changes: 8 additions & 4 deletions forex-mtl/src/main/scala/forex/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ import org.http4s.blaze.server.BlazeServerBuilder

object Main extends IOApp {

override def run(args: List[String]): IO[ExitCode] =
new Application[IO].stream(executionContext).compile.drain.as(ExitCode.Success)

override def run(args: List[String]): IO[ExitCode] = {
val startupTask = IO {StartupTasks.process()}
val app: IO[ExitCode] = new Application[IO].stream(executionContext).compile.drain.as(ExitCode.Success)
for {
_ <- startupTask.start //This will run async and starts another task after 4minutes to refresh cache...
exitCode <- app //This will run the app in current thread
} yield { exitCode}
}
}

class Application[F[_]: ConcurrentEffect: Timer] {
Expand All @@ -24,5 +29,4 @@ class Application[F[_]: ConcurrentEffect: Timer] {
.withHttpApp(module.httpApp)
.serve
} yield ()

}
12 changes: 12 additions & 0 deletions forex-mtl/src/main/scala/forex/StartupTasks.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package forex

import cats.effect.{ExitCode, IO}
import forex.components.scheduler.tasks.TaskScheduler
import forex.thirdparties.oneframae.OneFrameForexRatesHandler

object StartupTasks {
def process(): IO[ExitCode] = {
TaskScheduler.scheduleForexCacheRefreshJob
IO(OneFrameForexRatesHandler.handleCache()).as(ExitCode.Success)
}
}
11 changes: 11 additions & 0 deletions forex-mtl/src/main/scala/forex/components/cache/Algebra.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package forex.components.cache

import forex.services.rates.errors._


trait Algebra {

def put[A](key:String, value: A) : Boolean

def get(key:String) : Either[Error, String]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package forex.components.cache

import forex.components.cache.redis.Redis

object Protocol {
def redis(segment: String) : Algebra = new Redis(segment)
}
24 changes: 24 additions & 0 deletions forex-mtl/src/main/scala/forex/components/cache/redis/Redis.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package forex.components.cache.redis

import com.redis.RedisClient
import forex.components.cache.Algebra
import forex.programs.rates.ErrorCodes
import forex.services.rates.errors._

class Redis(segment: String) extends Algebra {

private[redis] lazy val redisClient: RedisClient = new RedisClient("localhost", 6379)

override def put[A](key: String, value: A): Boolean = {
redisClient.set(key = ForexRedisHelper.getSegmentPrefixedKey(segment, key), value = value)
}

override def get(key:String): Either[Error, String] = {
redisClient.get(ForexRedisHelper.getSegmentPrefixedKey(segment, key)) match {
case Some(value) => Right(value).withLeft[Error]
case None => Left(Error.RateLookupFailed(ErrorCodes.cacheFetchFailed, "Value Not Found In Cache"))
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package forex.components.cache.redis

abstract class RedisHelper {
def getSegmentPrefixedKey(segement: String, key:String): String
}

object Segments {
final val forexRates = "FOREX_RATES"
}

object ForexRedisHelper extends RedisHelper {
override def getSegmentPrefixedKey(segment: String, key: String): String = segment ++ "::" ++ key

def getFormattedKey(from: String, to: String): String = {
from + "&" + to
}
}
8 changes: 8 additions & 0 deletions forex-mtl/src/main/scala/forex/components/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package forex

import forex.components.cache.Algebra

package object components {
type Cache = Algebra
final val CacheAPI = cache.Protocol
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package forex.components.scheduler.tasks

import cats.effect.{Concurrent, IO, IOApp, Timer}
import cats.implicits.toFlatMapOps
import forex.domain.Tasks
import forex.thirdparties.oneframae.OneFrameForexRatesHandler
import org.log4s.{Logger, getLogger}

import scala.concurrent.duration._

class TaskScheduler(task: Tasks, f: IO[Unit], frequency: FiniteDuration) extends IOApp .Simple {

val logger: Logger = getLogger(getClass)

private def schedule[F[_] : Concurrent : Timer](task: F[Unit], frequency: FiniteDuration) : F[Unit] = {
Timer[F].sleep(frequency).flatMap(_ => task)
}

private[tasks] def scheduleJob(): Unit = {
if (TasksLibrary.isTaskSpawned(this.task)) {
logger.info("Job Already Spawned...")
} else {
schedule[IO](this.f, this.frequency)
TasksLibrary.addSpawnedTask(task) match {
case Some(_) => logger.info("Task Successfully Added To Task Library.")
case None => logger.warn("Task Addition to Library Failed. Audit and fix the issue.")
}
}
}

override def run: IO[Unit] = {
IO(scheduleJob())
}
}

object TaskScheduler {
def apply(task: Tasks, f: IO[Unit], frequency: FiniteDuration): TaskScheduler = {
TaskScheduler(task, f, frequency)
}

def scheduleForexCacheRefreshJob = {
TaskScheduler(Tasks.FOREX_JOB, OneFrameForexRatesHandler.handleCache(), 4.minutes)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package forex.components.scheduler.tasks

import forex.domain.Tasks

import scala.collection.mutable

object TasksLibrary {
private val tasksSpawned: mutable.Map[String, Boolean] = mutable.Map[String, Boolean]()

def isTaskSpawned(task: Tasks) : Boolean = tasksSpawned(task.name())

private[tasks] def addSpawnedTask(task: Tasks) : Option[Boolean] = tasksSpawned.put(task.name(), true)
}
2 changes: 2 additions & 0 deletions forex-mtl/src/main/scala/forex/domain/Currency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ object Currency {
case object SGD extends Currency
case object USD extends Currency

val supportedCurrencies : List[Currency] = List(AUD, CAD, CHF, EUR, GBP, NZD, JPY, SGD, USD)

implicit val show: Show[Currency] = Show.show {
case AUD => "AUD"
case CAD => "CAD"
Expand Down
14 changes: 14 additions & 0 deletions forex-mtl/src/main/scala/forex/domain/Tasks.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package forex.domain

sealed trait Tasks {
protected var taskName: String = null
protected def setName() : Unit
def name() : String = this.taskName
}

object Tasks {
case object FOREX_JOB extends Tasks {
override def setName(): Unit = this.taskName = "FOREX_JOB"
}
}

7 changes: 7 additions & 0 deletions forex-mtl/src/main/scala/forex/http/rates/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package rates
import forex.domain.Currency.show
import forex.domain.Rate.Pair
import forex.domain._
import forex.programs.rates.errors
import io.circe._
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredEncoder
Expand Down Expand Up @@ -36,4 +37,10 @@ object Protocol {
implicit val responseEncoder: Encoder[GetApiResponse] =
deriveConfiguredEncoder[GetApiResponse]

implicit val errorEncoder: Encoder[errors.Error] =
Encoder.instance {
case error: errors.Error.RateLookupFailed => rateLookupFailedEncoder(error)
}
implicit val rateLookupFailedEncoder: Encoder[errors.Error.RateLookupFailed] =
deriveConfiguredEncoder[errors.Error.RateLookupFailed] _
}
21 changes: 18 additions & 3 deletions forex-mtl/src/main/scala/forex/http/rates/RatesHttpRoutes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,37 @@ package rates

import cats.effect.Sync
import cats.syntax.flatMap._
import forex.domain.Rate
import forex.programs.RatesProgram
import forex.programs.rates.{ Protocol => RatesProgramProtocol }
import forex.programs.rates.{ErrorCodes, errors, Protocol => RatesProgramProtocol}
import io.circe.syntax.EncoderOps
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
import org.http4s.server.Router
import org.log4s.{Logger, getLogger}


class RatesHttpRoutes[F[_]: Sync](rates: RatesProgram[F]) extends Http4sDsl[F] {

import Converters._, QueryParams._, Protocol._

private[http] val prefixPath = "/rates"

val logger: Logger = getLogger(getClass)

private val httpRoutes: HttpRoutes[F] = HttpRoutes.of[F] {
case GET -> Root :? FromQueryParam(from) +& ToQueryParam(to) =>
rates.get(RatesProgramProtocol.GetRatesRequest(from, to)).flatMap(Sync[F].fromEither).flatMap { rate =>
Ok(rate.asGetApiResponse)
try {
val response: F[Either[errors.Error, Rate]] = rates.get(RatesProgramProtocol.GetRatesRequest(from, to))
response.flatMap {
case Right(_) => response.flatMap(Sync[F].fromEither).flatMap { rate => Ok(rate.asGetApiResponse) }
case Left(value) => BadRequest(value.asJson)
}
} catch {
case exception: Exception =>
logger.error(exception)("Exception Occured in Forex Service.")
val error: errors.Error = errors.Error.RateLookupFailed(ErrorCodes.internalError, "Rate Lookup Failed due to " + exception.getMessage())
BadRequest(error.asJson)
}
}

Expand Down
2 changes: 1 addition & 1 deletion forex-mtl/src/main/scala/forex/programs/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package forex
package object programs {
type RatesProgram[F[_]] = rates.Algebra[F]
final val RatesProgram = rates.Program
}
}
10 changes: 10 additions & 0 deletions forex-mtl/src/main/scala/forex/programs/rates/ErrorCodes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package forex.programs.rates

object ErrorCodes {
final val cachePopulationFailed: String = "FX_CacheNotPopulated"
final val cacheFetchFailed: String = "FX_CacheFetchFailed"
final val cacheInitFailed: String = "FX_CachecInitializationFailed"
final val cacheRefreshFailed: String = "FX_CacheRefreshFailed"
final val fxRateLookUpFailed: String = "FX_RatesFetchFailed"
final val internalError: String = "FX_InternalError"
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Program[F[_]: Functor](
) extends Algebra[F] {

override def get(request: Protocol.GetRatesRequest): F[Error Either Rate] =
EitherT(ratesService.get(Rate.Pair(request.from, request.to))).leftMap(toProgramError(_)).value
EitherT(ratesService.get(Rate.Pair(request.from, request.to))).leftMap(toProgramError).value

}

Expand Down
9 changes: 6 additions & 3 deletions forex-mtl/src/main/scala/forex/programs/rates/errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import forex.services.rates.errors.{ Error => RatesServiceError }

object errors {

sealed trait Error extends Exception
class Error(code: String, message: String) extends Exception {
override def toString(): String = s"Code: $code, Message: $message"
}
object Error {
final case class RateLookupFailed(msg: String) extends Error
final case class RateLookupFailed(code:String, msg: String) extends Error(code, msg)
}

def toProgramError(error: RatesServiceError): Error = error match {
case RatesServiceError.OneFrameLookupFailed(msg) => Error.RateLookupFailed(msg)
case RatesServiceError.OneFrameLookupFailed(code, msg) => Error.RateLookupFailed(code, msg)
case RatesServiceError.RateLookupFailed(code, msg) => Error.RateLookupFailed(code, msg)
}
}
3 changes: 2 additions & 1 deletion forex-mtl/src/main/scala/forex/services/rates/errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ object errors {

sealed trait Error
object Error {
final case class OneFrameLookupFailed(msg: String) extends Error
final case class OneFrameLookupFailed(code:String, msg: String) extends Error
final case class RateLookupFailed(code:String, msg: String) extends Error
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,29 @@ import forex.services.rates.Algebra
import cats.Applicative
import cats.syntax.applicative._
import cats.syntax.either._
import forex.domain.{ Price, Rate, Timestamp }
import forex.domain.{Price, Rate, Timestamp}
import forex.services.rates.errors._
import forex.components._
import forex.components.cache.redis.Segments


class OneFrameDummy[F[_]: Applicative] extends Algebra[F] {

override def get(pair: Rate.Pair): F[Error Either Rate] =
Rate(pair, Price(BigDecimal(100)), Timestamp.now).asRight[Error].pure[F]
private final val redisAPI: Cache = CacheAPI.redis(Segments.forexRates)

override def get(pair: Rate.Pair): F[Error Either Rate] = {
val formattedCacheKey: String = getFormattedCacheKey(pair)
redisAPI.get(formattedCacheKey) match {
case Right(value) => Rate(pair, Price(BigDecimal(value.toDouble)), Timestamp.now)
.asRight[Error]
.pure[F]
case Left(value) => value
.asLeft[Rate]
.pure[F]
}
}

private def getFormattedCacheKey(pair: Rate.Pair) : String = {
pair.from.toString + "&" + pair.to.toString
}
}
Loading

0 comments on commit 81de7fe

Please sign in to comment.