spot7.org logo
Home PHP C# C++ Android Java Javascript Python IOS SQL HTML Categories

Sequencing Scala Futures with bounded parallelism (without messing around with ExecutorContexts)


I have example how to do it with scalaz-stream. It's quite a lot of code because it's required to convert scala Future to scalaz Task (abstraction for deferred computation). However it's required to add it to project once. Another option is to use Task for defining 'doWork'. I personally prefer task for building async programs.

  import scala.concurrent.{Future =>
SFuture}
  import scala.util.Random
  import
scala.concurrent.ExecutionContext.Implicits.global


  import scalaz.stream._
  import scalaz.concurrent._

  val P = scalaz.stream.Process

  val rnd = new Random()

  def doWork(symbol: String): SFuture[Unit] =
SFuture {
    Thread.sleep(rnd.nextInt(1000))
    println(s"Symbol: $symbol. Thread:
${Thread.currentThread().getName}")
  }

  val symbols = Seq("AAPL", "MSFT", "GOOGL",
"CVX").
    flatMap(s =>
Seq.fill(5)(s).zipWithIndex.map(t =>
s"${t._1}${t._2}"))

  implicit class Transformer[+T](fut: =>
SFuture[T]) {
    def toTask(implicit ec:
scala.concurrent.ExecutionContext): Task[T] = {
      import scala.util.{Failure, Success}
      import scalaz.syntax.either._
      Task.async {
        register =>
          fut.onComplete {
            case Success(v) =>
register(v.right)
            case Failure(ex) =>
register(ex.left)
          }
      }
    }
  }

  implicit class ConcurrentProcess[O](val process:
Process[Task, O]) {
    def concurrently[O2](concurrencyLevel: Int)(f:
Channel[Task, O, O2]): Process[Task, O2] = {
      val actions =
        process.
          zipWith(f)((data, f) => f(data))

      val nestedActions =
        actions.map(P.eval)

     
merge.mergeN(concurrencyLevel)(nestedActions)
    }
  }

  val workChannel = io.channel((s: String) =>
doWork(s).toTask)

  val process =
Process.emitAll(symbols).concurrently(5)(workChannel)

  process.run.run

When you'll have all this transformation in scope, basically all you need is:

  val workChannel = io.channel((s:
String) => doWork(s).toTask)

  val process =
Process.emitAll(symbols).concurrently(5)(workChannel)

Quite short and self-decribing


Categories : Scala

Related to : Sequencing Scala Futures with bounded parallelism (without messing around with ExecutorContexts)
Futures in Scala.js
There are 2 existing and working ExecutionContexts in scala.scalajs.concurrent.JSExecutionContext, with implicit versions in the inner object Implicits. Simply import the one that makes sense for you (probably queue, the other one not being actually asynchronous). import scala.scalajs.concurrent.JSExecutionContext.Implicits.queue // now you get to play with Futures

Categories : Javascript
Instantiate self-bounded generic in java
Not exactly an answer to your problem, but I think your problem is caused by a suboptimal design. Keep your types simple! For a binary tree, I would expect the type parameter to be the type of the data in the tree, i.e. the type of the labels of the nodes. Since you can use type variables as type parameters also, this should be exactly what you want. public class BinaryTree<E> { public

Categories : Java
Composing Futures with For Comprehension
It looks like you're trying to compose Futures with Vector. For comprehensions in scala have to all be of the same higher type, which in your case is Future. When you unroll the 'sugar' of the for comprehension, it's just calling flatMap on everything. for { categories <- categoriesFuture // I'm not sure what the return type is here, but I'm guessing it's a future as well categoryIdsWith

Categories : Scala
Keep track of completed Futures
I would suggest using the standard Java AtomicInteger. You can increment it using the incrementAndGet() method, and obtain the current value via its get() method. import java.util.concurrent.atomic.AtomicInteger ... val completed = new AtomicInteger() val futures = for(i <- 0 until nSteps) yield future { ... val content = blocking { ... http request ... } process(content) compl

Categories : Scala
bootstrap UI css messing up application css
yes there is. if you are using less for example, bootstrap provieds less files and not just css files. so you can scope the entire bootstrap css with some prefix class of yours. for example: .bootstrap-ui { @import "../bower_components/bootstrap/less/bootstrap"; } this means you can now do put bootstrap component here..... and only this div will be impacted by the bootstrap css.

Categories : CSS
Recently Add
Reverse list of n elements
scalac: Error: object CharRef in intelliJ 14
Scala - trouble with type inference in lambda expression
Exception on spark test
How can I emulate Haskell's typeclasses in Scala?
Slick: CRUD extension: How to encapsulate implicit mapping:BaseColumnType[T]
Can't find Traverse for sequencing Seq[ValidationNel[String, MyCaseClass]] => ValidationNel[String, Seq[MyCaseClass]]
Tail recursion: internal "loop" function or default values for accumulators
Scala - Add element:MyType to Array = option[MyType] expected
Scala, Composing Function with two values
Using dependent type to generate compile error
How to match all words in a sentence with scala combinators?
Parser Alternative Operator | Fails
ScalaTest assertion mismatch due to Physical Address
Scala implicit parameter and japanese smiley 'foldLeft'
Is it possible to user reduceByKey((x, y, z) => ...)?
How to implement security Authorization using scala and play?
SSO login using scala script
Sum elements based on a predicate
Keep track of completed Futures
API Observable with dynamic caching
java.io.IOException: Remotely closed in gatling
Scala permutations using two lists
Is Scala Either really a Monad
Spark: Use of distinct
Identifying two type wildcards as identical
how to package spark scala application
Pattern Match on Scala `class`
Type mismatch when using higher-kinded types
Scala List match last element
© Copyright 2017 spot7.org Publishing Limited. All rights reserved.