I am trying to work with the JVM API in scala; however, I am finding that every
time I run my app, it is sending old messages. I am new to akka so I am not sure where it is persisting them / why it is re-sending them.
The expected case is that when I run the APP, it consumes my data and sends it; however, it is sending an older batch of data first.
My example code looks like:
import scala.io.Source
import scala.concurrent.duration._
import akka.actor._
import akka.actor.Status.Failure
import akka.actor.{ ActorLogging, Actor, Props, ActorSystem }
import argonaut., Argonaut.
import eventstore._
import eventstore.tcp.ConnectionActor
import scalaz., Scalaz.
import databazaar.csv._
object DataWriter {
def writeFile(streamId: String, path: String): Unit= {
fromCsvReader(streamId, CsvReader open path)
}
def writeSource(streamId: String, source: Source): Unit = {
fromCsvReader(streamId, new CsvReader(source))
}
private def toSnakeCase(cs: String): String = cs.toLowerCase.replace(’ ', ‘_’)
private def mapToJsonString(map: Map[String, String]): String = {
map.map {
case (k, v) => s""""${toSnakeCase(k)}":"${v}""""
}.mkString("{", “,”, “}”)
}
def fromCsvReader(streamId: String, csvReader: CsvReader): Unit = {
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props())
implicit val writeResult = system.actorOf(Props[WriteResult])
connection ! WriteEvents(EventStream.Id(streamId), csvReader.map {
m => EventData(
streamId + “-event”, data = Content.Json(mapToJsonString(m)))
}.toList)
}
class WriteResult extends Actor with ActorLogging {
def receive = {
case WriteEventsCompleted(range, position) =>
log.info(“range: {}, position: {}”, range, position)
context.system.shutdown()
case Failure(e: EsException) =>
log.error(e.toString)
context.system.shutdown()
}
}
}
``