Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PATCH] Amended Forex Service and Added Refresh Task #70

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions forex-mtl/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,10 @@ libraryDependencies ++= Seq(
Libraries.scalaCheck % Test,
Libraries.catsScalaCheck % Test
)

libraryDependencies ++= Seq(
"net.debasishg" %% "redisclient" % "3.41"
)

libraryDependencies += "com.lihaoyi" %% "requests" % "0.8.0"

5 changes: 4 additions & 1 deletion forex-mtl/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
app {
http {
host = "0.0.0.0"
port = 8080
port = 8081
timeout = 40 seconds
}
oneframe {
token = "10dc303535874aeccc86a8251e6992f5"
}
}

12 changes: 8 additions & 4 deletions forex-mtl/src/main/scala/forex/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ import org.http4s.blaze.server.BlazeServerBuilder

object Main extends IOApp {

override def run(args: List[String]): IO[ExitCode] =
new Application[IO].stream(executionContext).compile.drain.as(ExitCode.Success)

override def run(args: List[String]): IO[ExitCode] = {
val startupTask = IO {StartupTasks.process()}
val app: IO[ExitCode] = new Application[IO].stream(executionContext).compile.drain.as(ExitCode.Success)
for {
_ <- startupTask.start //This will run async and starts another task after 4minutes to refresh cache...
exitCode <- app //This will run the app in current thread
} yield { exitCode}
}
}

class Application[F[_]: ConcurrentEffect: Timer] {
Expand All @@ -24,5 +29,4 @@ class Application[F[_]: ConcurrentEffect: Timer] {
.withHttpApp(module.httpApp)
.serve
} yield ()

}
12 changes: 12 additions & 0 deletions forex-mtl/src/main/scala/forex/StartupTasks.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package forex

import cats.effect.{ExitCode, IO}
import forex.components.scheduler.tasks.TaskScheduler
import forex.thirdparties.oneframae.OneFrameForexRatesHandler

object StartupTasks {
def process(): IO[ExitCode] = {
TaskScheduler.scheduleForexCacheRefreshJob
IO(OneFrameForexRatesHandler.handleCache()).as(ExitCode.Success)
}
}
11 changes: 11 additions & 0 deletions forex-mtl/src/main/scala/forex/components/cache/Algebra.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package forex.components.cache

import forex.services.rates.errors._


trait Algebra {

def put[A](key:String, value: A) : Boolean

def get(key:String) : Either[Error, String]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package forex.components.cache

import forex.components.cache.redis.Redis

object Protocol {
def redis(segment: String) : Algebra = new Redis(segment)
}
24 changes: 24 additions & 0 deletions forex-mtl/src/main/scala/forex/components/cache/redis/Redis.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package forex.components.cache.redis

import com.redis.RedisClient
import forex.components.cache.Algebra
import forex.programs.rates.ErrorCodes
import forex.services.rates.errors._

class Redis(segment: String) extends Algebra {

private[redis] lazy val redisClient: RedisClient = new RedisClient("localhost", 6379)

override def put[A](key: String, value: A): Boolean = {
redisClient.set(key = ForexRedisHelper.getSegmentPrefixedKey(segment, key), value = value)
}

override def get(key:String): Either[Error, String] = {
redisClient.get(ForexRedisHelper.getSegmentPrefixedKey(segment, key)) match {
case Some(value) => Right(value).withLeft[Error]
case None => Left(Error.RateLookupFailed(ErrorCodes.cacheFetchFailed, "Value Not Found In Cache"))
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package forex.components.cache.redis

abstract class RedisHelper {
def getSegmentPrefixedKey(segement: String, key:String): String
}

object Segments {
final val forexRates = "FOREX_RATES"
}

object ForexRedisHelper extends RedisHelper {
override def getSegmentPrefixedKey(segment: String, key: String): String = segment ++ "::" ++ key

def getFormattedKey(from: String, to: String): String = {
from + "&" + to
}
}
8 changes: 8 additions & 0 deletions forex-mtl/src/main/scala/forex/components/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package forex

import forex.components.cache.Algebra

package object components {
type Cache = Algebra
final val CacheAPI = cache.Protocol
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package forex.components.scheduler.tasks

import cats.effect.{Concurrent, IO, IOApp, Timer}
import cats.implicits.toFlatMapOps
import forex.domain.Tasks
import forex.thirdparties.oneframae.OneFrameForexRatesHandler
import org.log4s.{Logger, getLogger}

import scala.concurrent.duration._

class TaskScheduler(task: Tasks, f: IO[Unit], frequency: FiniteDuration) extends IOApp .Simple {

val logger: Logger = getLogger(getClass)

private def schedule[F[_] : Concurrent : Timer](task: F[Unit], frequency: FiniteDuration) : F[Unit] = {
Timer[F].sleep(frequency).flatMap(_ => task)
}

private[tasks] def scheduleJob(): Unit = {
if (TasksLibrary.isTaskSpawned(this.task)) {
logger.info("Job Already Spawned...")
} else {
schedule[IO](this.f, this.frequency)
TasksLibrary.addSpawnedTask(task) match {
case Some(_) => logger.info("Task Successfully Added To Task Library.")
case None => logger.warn("Task Addition to Library Failed. Audit and fix the issue.")
}
}
}

override def run: IO[Unit] = {
IO(scheduleJob())
}
}

object TaskScheduler {
def apply(task: Tasks, f: IO[Unit], frequency: FiniteDuration): TaskScheduler = {
TaskScheduler(task, f, frequency)
}

def scheduleForexCacheRefreshJob = {
TaskScheduler(Tasks.FOREX_JOB, OneFrameForexRatesHandler.handleCache(), 4.minutes)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package forex.components.scheduler.tasks

import forex.domain.Tasks

import scala.collection.mutable

object TasksLibrary {
private val tasksSpawned: mutable.Map[String, Boolean] = mutable.Map[String, Boolean]()

def isTaskSpawned(task: Tasks) : Boolean = tasksSpawned(task.name())

private[tasks] def addSpawnedTask(task: Tasks) : Option[Boolean] = tasksSpawned.put(task.name(), true)
}
2 changes: 2 additions & 0 deletions forex-mtl/src/main/scala/forex/domain/Currency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ object Currency {
case object SGD extends Currency
case object USD extends Currency

val supportedCurrencies : Set[Currency] = Set(AUD, CAD, CHF, EUR, GBP, NZD, JPY, SGD, USD)

implicit val show: Show[Currency] = Show.show {
case AUD => "AUD"
case CAD => "CAD"
Expand Down
14 changes: 14 additions & 0 deletions forex-mtl/src/main/scala/forex/domain/Tasks.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package forex.domain

sealed trait Tasks {
protected var taskName: String = null
protected def setName() : Unit
def name() : String = this.taskName
}

object Tasks {
case object FOREX_JOB extends Tasks {
override def setName(): Unit = this.taskName = "FOREX_JOB"
}
}

7 changes: 7 additions & 0 deletions forex-mtl/src/main/scala/forex/http/rates/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package rates
import forex.domain.Currency.show
import forex.domain.Rate.Pair
import forex.domain._
import forex.programs.rates.errors
import io.circe._
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredEncoder
Expand Down Expand Up @@ -36,4 +37,10 @@ object Protocol {
implicit val responseEncoder: Encoder[GetApiResponse] =
deriveConfiguredEncoder[GetApiResponse]

implicit val errorEncoder: Encoder[errors.Error] =
Encoder.instance {
case error: errors.Error.RateLookupFailed => rateLookupFailedEncoder(error)
}
implicit val rateLookupFailedEncoder: Encoder[errors.Error.RateLookupFailed] =
deriveConfiguredEncoder[errors.Error.RateLookupFailed] _
}
21 changes: 18 additions & 3 deletions forex-mtl/src/main/scala/forex/http/rates/RatesHttpRoutes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,37 @@ package rates

import cats.effect.Sync
import cats.syntax.flatMap._
import forex.domain.Rate
import forex.programs.RatesProgram
import forex.programs.rates.{ Protocol => RatesProgramProtocol }
import forex.programs.rates.{ErrorCodes, errors, Protocol => RatesProgramProtocol}
import io.circe.syntax.EncoderOps
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
import org.http4s.server.Router
import org.log4s.{Logger, getLogger}


class RatesHttpRoutes[F[_]: Sync](rates: RatesProgram[F]) extends Http4sDsl[F] {

import Converters._, QueryParams._, Protocol._

private[http] val prefixPath = "/rates"

val logger: Logger = getLogger(getClass)

private val httpRoutes: HttpRoutes[F] = HttpRoutes.of[F] {
case GET -> Root :? FromQueryParam(from) +& ToQueryParam(to) =>
rates.get(RatesProgramProtocol.GetRatesRequest(from, to)).flatMap(Sync[F].fromEither).flatMap { rate =>
Ok(rate.asGetApiResponse)
try {
val response: F[Either[errors.Error, Rate]] = rates.get(RatesProgramProtocol.GetRatesRequest(from, to))
response.flatMap {
case Right(_) => response.flatMap(Sync[F].fromEither).flatMap { rate => Ok(rate.asGetApiResponse) }
case Left(value) => BadRequest(value.asJson)
}
} catch {
case exception: Exception =>
logger.error(exception)("Exception Occured in Forex Service.")
val error: errors.Error = errors.Error.RateLookupFailed(ErrorCodes.internalError, "Rate Lookup Failed due to " + exception.getMessage())
BadRequest(error.asJson)
}
}

Expand Down
2 changes: 1 addition & 1 deletion forex-mtl/src/main/scala/forex/programs/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package forex
package object programs {
type RatesProgram[F[_]] = rates.Algebra[F]
final val RatesProgram = rates.Program
}
}
10 changes: 10 additions & 0 deletions forex-mtl/src/main/scala/forex/programs/rates/ErrorCodes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package forex.programs.rates

object ErrorCodes {
final val cachePopulationFailed: String = "FX_CacheNotPopulated"
final val cacheFetchFailed: String = "FX_CacheFetchFailed"
final val cacheInitFailed: String = "FX_CachecInitializationFailed"
final val cacheRefreshFailed: String = "FX_CacheRefreshFailed"
final val fxRateLookUpFailed: String = "FX_RatesFetchFailed"
final val internalError: String = "FX_InternalError"
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Program[F[_]: Functor](
) extends Algebra[F] {

override def get(request: Protocol.GetRatesRequest): F[Error Either Rate] =
EitherT(ratesService.get(Rate.Pair(request.from, request.to))).leftMap(toProgramError(_)).value
EitherT(ratesService.get(Rate.Pair(request.from, request.to))).leftMap(toProgramError).value

}

Expand Down
9 changes: 6 additions & 3 deletions forex-mtl/src/main/scala/forex/programs/rates/errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import forex.services.rates.errors.{ Error => RatesServiceError }

object errors {

sealed trait Error extends Exception
class Error(code: String, message: String) extends Exception {
override def toString(): String = s"Code: $code, Message: $message"
}
object Error {
final case class RateLookupFailed(msg: String) extends Error
final case class RateLookupFailed(code:String, msg: String) extends Error(code, msg)
}

def toProgramError(error: RatesServiceError): Error = error match {
case RatesServiceError.OneFrameLookupFailed(msg) => Error.RateLookupFailed(msg)
case RatesServiceError.OneFrameLookupFailed(code, msg) => Error.RateLookupFailed(code, msg)
case RatesServiceError.RateLookupFailed(code, msg) => Error.RateLookupFailed(code, msg)
}
}
3 changes: 2 additions & 1 deletion forex-mtl/src/main/scala/forex/services/rates/errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ object errors {

sealed trait Error
object Error {
final case class OneFrameLookupFailed(msg: String) extends Error
final case class OneFrameLookupFailed(code:String, msg: String) extends Error
final case class RateLookupFailed(code:String, msg: String) extends Error
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,29 @@ import forex.services.rates.Algebra
import cats.Applicative
import cats.syntax.applicative._
import cats.syntax.either._
import forex.domain.{ Price, Rate, Timestamp }
import forex.domain.{Price, Rate, Timestamp}
import forex.services.rates.errors._
import forex.components._
import forex.components.cache.redis.Segments


class OneFrameDummy[F[_]: Applicative] extends Algebra[F] {

override def get(pair: Rate.Pair): F[Error Either Rate] =
Rate(pair, Price(BigDecimal(100)), Timestamp.now).asRight[Error].pure[F]
private final val redisAPI: Cache = CacheAPI.redis(Segments.forexRates)

override def get(pair: Rate.Pair): F[Error Either Rate] = {
val formattedCacheKey: String = getFormattedCacheKey(pair)
redisAPI.get(formattedCacheKey) match {
case Right(value) => Rate(pair, Price(BigDecimal(value.toDouble)), Timestamp.now)
.asRight[Error]
.pure[F]
case Left(value) => value
.asLeft[Rate]
.pure[F]
}
}

private def getFormattedCacheKey(pair: Rate.Pair) : String = {
pair.from.toString + "&" + pair.to.toString
}
}
Loading