Compare commits
4 Commits
f1d5654bf7
...
e68617b470
Author | SHA1 | Date |
---|---|---|
Oliver Kennedy | e68617b470 | |
Oliver Kennedy | 2135576901 | |
Oliver Kennedy | c5a82e13f6 | |
Oliver Kennedy | e98314d488 |
20
README.md
20
README.md
|
@ -106,6 +106,26 @@ Utilities for working with REST APIs.
|
|||
* `RestAPI(url).poll(topic, interval = 1.hour)`: Poll the provided URL every interval and post the result to the specified MQTT topic.
|
||||
* `RestAPI(url).get`: Retrieve the current contents of the GET url as a Scala `Try[String]`
|
||||
|
||||
#### MPD
|
||||
The [music player daemon](https://www.musicpd.org/). To use the MPD integration, you need to provide access information in your config file:
|
||||
```
|
||||
"mpd" : {
|
||||
"host" : "localhost",
|
||||
}
|
||||
```
|
||||
|
||||
* `MPD.status`: A stream of the MPD status
|
||||
* `MPD.playlist`: A stream of MPD's current playlist
|
||||
* `MPD.currentTrack`: A stream of the currently playing song
|
||||
* `MPD.position`: A stream of the current timestamp in the song
|
||||
|
||||
* `MPD.play()`: Start playing the current playlist
|
||||
* `MPD.clear()`: Clear the current playlist
|
||||
* `MPD.list(path)`: List the MPD-local files at the specified path.
|
||||
* `MPD.savedPlaylists`: List all of the MPD saved playlists
|
||||
* `MPD.savedPlaylist(name)`: List all songs in the specified saved playlist
|
||||
* `MPD.append(path)`: Add the song at the specified path to the end of the playlist
|
||||
|
||||
## Compiling Shingle
|
||||
|
||||
#### Setup
|
||||
|
|
3
build.sc
3
build.sc
|
@ -20,7 +20,8 @@ object shingle extends ScalaModule
|
|||
ivy"org.slf4j:slf4j-api:1.7.36",
|
||||
ivy"ch.qos.logback:logback-classic:1.2.10",
|
||||
ivy"com.typesafe.scala-logging::scala-logging::3.9.4",
|
||||
ivy"org.scala-lang::scala3-compiler::3.1.2"
|
||||
ivy"org.scala-lang::scala3-compiler::3.1.2",
|
||||
ivy"net.domlom::websocket-scala:0.0.3".withDottyCompat(scalaVersion()),
|
||||
)
|
||||
|
||||
}
|
|
@ -1,47 +0,0 @@
|
|||
package net.okennedy.shingle
|
||||
|
||||
object Hass
|
||||
{
|
||||
val config = Shingle.config("hass").obj
|
||||
val host = config("host").str
|
||||
val token = config("token").str
|
||||
|
||||
def post(path: String, body: ujson.Value): Unit =
|
||||
{
|
||||
println(s"POST: $host/api/$path")
|
||||
println(body.render())
|
||||
requests.post(
|
||||
s"$host/api/$path",
|
||||
data = body.render(),
|
||||
headers = Map(
|
||||
"Authorization" -> s"Bearer $token",
|
||||
"Content-Type" -> "application/json"
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def set(entity: String, state: String, attributes: (String, ujson.Value)*): Unit =
|
||||
{
|
||||
post(
|
||||
s"states/$entity",
|
||||
ujson.Obj(
|
||||
"state" -> state,
|
||||
"attributes" -> (
|
||||
if(attributes.isEmpty) { ujson.Obj() }
|
||||
else { ujson.Obj(attributes.head, attributes.tail:_*) }
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def service(domain: String, service: String, data: (String, ujson.Value)*): Unit =
|
||||
{
|
||||
post(
|
||||
s"services/$domain/$service",
|
||||
(
|
||||
if(data.isEmpty) { ujson.Obj() }
|
||||
else { ujson.Obj(data.head, data.tail:_*) }
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
|
@ -1,132 +0,0 @@
|
|||
package net.okennedy.shingle
|
||||
|
||||
import org.bff.javampd.file.MPDFile
|
||||
import org.bff.javampd.server.{ MPD => MPDServer }
|
||||
import collection.JavaConverters.collectionAsScalaIterableConverter
|
||||
import org.bff.javampd.song.MPDSong
|
||||
import org.bff.javampd.output.OutputChangeListener
|
||||
import org.bff.javampd.output.OutputChangeEvent
|
||||
import org.bff.javampd.player.PlayerChangeListener
|
||||
import org.bff.javampd.player.PlayerChangeEvent
|
||||
import org.bff.javampd.player.PlayerBasicChangeListener
|
||||
import org.bff.javampd.player.PlayerBasicChangeEvent
|
||||
import org.bff.javampd.playlist.PlaylistBasicChangeListener
|
||||
import org.bff.javampd.playlist.PlaylistBasicChangeEvent
|
||||
import org.bff.javampd.server.ConnectionChangeListener
|
||||
import org.bff.javampd.server.ConnectionChangeEvent
|
||||
import org.bff.javampd.player.TrackPositionChangeListener
|
||||
import org.bff.javampd.player.TrackPositionChangeEvent
|
||||
|
||||
object MPD
|
||||
{
|
||||
def config = Shingle.config("mpd").obj
|
||||
lazy val instance =
|
||||
MPDServer.Builder()
|
||||
.server(config("host").str)
|
||||
.build()
|
||||
lazy val topic =
|
||||
config("topic").str
|
||||
|
||||
lazy val db =
|
||||
instance.getMusicDatabase()
|
||||
|
||||
def apply(path: String): Option[MPDFile] =
|
||||
{
|
||||
var curr: Option[MPDFile] = None
|
||||
var dir: Iterable[MPDFile] = Seq.empty
|
||||
for(i <- path.split("/")){
|
||||
if(i != ""){
|
||||
dir = curr match {
|
||||
case None => db.getFileDatabase.listRootDirectory.asScala
|
||||
case Some(f) => {
|
||||
if(!f.isDirectory){ return None }
|
||||
db.getFileDatabase.listDirectory(f).asScala
|
||||
}
|
||||
}
|
||||
curr = dir.find { _.getPath.split("/").last == i }
|
||||
if(curr.isEmpty){ return None }
|
||||
}
|
||||
}
|
||||
return curr
|
||||
}
|
||||
|
||||
def savedPlaylists: Seq[String] =
|
||||
db.getPlaylistDatabase.listPlaylists.asScala.toSeq
|
||||
|
||||
def savedPlaylist(name: String): Seq[MPDSong] =
|
||||
db.getPlaylistDatabase.listPlaylistSongs(name).asScala.toSeq
|
||||
|
||||
def list(path: String): Seq[MPDFile] =
|
||||
apply(path).toSeq.flatMap { db.getFileDatabase.listDirectory(_).asScala.toSeq }
|
||||
|
||||
def append(path: String): Boolean =
|
||||
{
|
||||
val file = apply(path).getOrElse { return false }
|
||||
instance.getPlaylist.addSong(file.getPath)
|
||||
return true
|
||||
}
|
||||
|
||||
def play(): Unit =
|
||||
instance.getPlayer.play()
|
||||
|
||||
def clear(): Unit =
|
||||
instance.getPlaylist.clearPlaylist()
|
||||
|
||||
|
||||
implicit def songToJson(song: MPDSong): ujson.Obj =
|
||||
ujson.Obj(
|
||||
"title" -> song.getTitle,
|
||||
"artist" -> song.getArtistName,
|
||||
"album" -> song.getAlbumName,
|
||||
"position" -> song.getPosition
|
||||
)
|
||||
|
||||
val status = Mqtt(s"$topic/status").isState
|
||||
val playlist = Mqtt(s"$topic/playlist").isState
|
||||
val currentTrack = Mqtt(s"$topic/now_playing").isState
|
||||
val position = Mqtt(s"$topic/position").isState
|
||||
|
||||
if(config.get("proxy").map { _.bool }.getOrElse(true)){
|
||||
status.subscribe
|
||||
playlist.subscribe
|
||||
currentTrack.subscribe
|
||||
position.subscribe
|
||||
} else {
|
||||
val monitor = instance.getMonitor
|
||||
monitor.addPlayerChangeListener(new PlayerBasicChangeListener(){
|
||||
def playerBasicChange(evt: PlayerBasicChangeEvent) =
|
||||
status << (
|
||||
evt.getStatus match {
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_PAUSED => "pause"
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_STARTED => "play"
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_STOPPED => "pause"
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_UNPAUSED => "play"
|
||||
}
|
||||
)
|
||||
})
|
||||
monitor.addPlaylistChangeListener(new PlaylistBasicChangeListener(){
|
||||
def playlistBasicChange(evt: PlaylistBasicChangeEvent) =
|
||||
evt.getEvent match {
|
||||
case PlaylistBasicChangeEvent.Event.PLAYLIST_CHANGED
|
||||
| PlaylistBasicChangeEvent.Event.PLAYLIST_ENDED =>
|
||||
playlist << (
|
||||
ujson.Arr.from(
|
||||
instance.getPlaylist.getSongList.asScala.toSeq.map { songToJson(_) }
|
||||
)
|
||||
)
|
||||
case PlaylistBasicChangeEvent.Event.SONG_CHANGED =>
|
||||
currentTrack << (
|
||||
songToJson(instance.getPlaylist.getCurrentSong)
|
||||
)
|
||||
case _ => ()
|
||||
}
|
||||
})
|
||||
|
||||
monitor.addTrackPositionChangeListener(new TrackPositionChangeListener(){
|
||||
def trackPositionChanged(evt: TrackPositionChangeEvent) =
|
||||
position << evt.getElapsedTime
|
||||
})
|
||||
|
||||
monitor.start()
|
||||
}
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
package net.okennedy.shingle
|
||||
|
||||
import net.okennedy.shingle.Shingle
|
||||
import org.fusesource.mqtt.client.QoS
|
||||
import scala.util.Try
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
import net.okennedy.shingle.cron.IntervalCronEvent
|
||||
import net.okennedy.shingle.cron.Cron
|
||||
import scala.concurrent.duration._
|
||||
|
||||
case class RestAPI(
|
||||
val url: String,
|
||||
val headers: Iterable[(String, String)] = Seq.empty,
|
||||
)
|
||||
{
|
||||
def get: Try[String] =
|
||||
{
|
||||
println(s"Fetching $url")
|
||||
val result = requests.get(
|
||||
url = url,
|
||||
headers = headers,
|
||||
)
|
||||
|
||||
if(result.statusCode == 200){ Success(result.string()) }
|
||||
else { Failure(new Exception(s"Result ${result.statusCode} when fetching $url")) }
|
||||
}
|
||||
|
||||
def poll(topic: MqttPath, interval: FiniteDuration = 1.hour)(implicit owner: Owner) =
|
||||
Cron.add(
|
||||
new IntervalCronEvent(interval) {
|
||||
// override def immediate = true
|
||||
def trigger() =
|
||||
get match {
|
||||
case Success(r) => topic << r
|
||||
case Failure(err) => println(s"Error fetching $url: $err")
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
|
@ -6,6 +6,9 @@ import net.okennedy.shingle.module.Module
|
|||
import java.util.Calendar
|
||||
import cask.main.Routes
|
||||
import net.okennedy.shingle.cron.Cron
|
||||
import net.okennedy.shingle.component.Component
|
||||
import java.io.FileNotFoundException
|
||||
import scala.collection.mutable
|
||||
|
||||
object Shingle
|
||||
extends cask.Main
|
||||
|
@ -14,123 +17,49 @@ object Shingle
|
|||
override def allRoutes: Seq[Routes] = Seq(Webserver)
|
||||
override def port: Int = 4000
|
||||
|
||||
val config = ujson.read(Source.fromFile(
|
||||
System.getProperty("user.home") + File.separator + ".shingle" + File.separator + "config.json"
|
||||
).getLines.mkString("\n"))
|
||||
val configFile = System.getProperty("user.home") + File.separator + ".shingle" + File.separator + "config.json"
|
||||
val config =
|
||||
try {
|
||||
ujson.read(Source.fromFile(configFile)
|
||||
.getLines
|
||||
.mkString("\n"))
|
||||
.obj
|
||||
} catch {
|
||||
case f: FileNotFoundException =>
|
||||
throw new RuntimeException(
|
||||
s"No shingle config file at $configFile"
|
||||
)
|
||||
}
|
||||
|
||||
override def main(args: Array[String]): Unit =
|
||||
{
|
||||
println("Welcome to Shingle\nInitializing libraries...")
|
||||
println("Welcome to Shingle\nInitializing components...")
|
||||
|
||||
println("...Mqtt")
|
||||
// Mqtt.start()
|
||||
println("...MPD")
|
||||
// MPD
|
||||
println("...Hass")
|
||||
// Hass
|
||||
println("...Weather")
|
||||
// Weather
|
||||
println("...Cron")
|
||||
val active = mutable.Set[String]()
|
||||
for(component <- Component.all){
|
||||
config.get(component.slug)
|
||||
.map { componentConfig =>
|
||||
val missingDeps = component.depends -- active
|
||||
if(missingDeps.isEmpty)
|
||||
{
|
||||
component.init(componentConfig)
|
||||
active.add(component.slug)
|
||||
println(s" ... initialized ${component.name}")
|
||||
} else {
|
||||
println(s" ... skipping ${component.name}; Missing dependencies on ${missingDeps.map { Component(_).name }.mkString(", ")}")
|
||||
}
|
||||
}.getOrElse {
|
||||
println(s" ... Missing configuration for ${component.name}")
|
||||
}
|
||||
}
|
||||
|
||||
println("Initializing timer thread...")
|
||||
Cron.handler.start()
|
||||
println("Installing user modules...")
|
||||
Module.init()
|
||||
println("Shingle is running.")
|
||||
println(s" ... admin interface on http://localhost:${port}/api")
|
||||
|
||||
super.main(args)
|
||||
|
||||
|
||||
// Module("TIME_INTENTS") { implicit ctx =>
|
||||
// var kitchenTimer: Timer = new Timer(Mqtt("timer/kitchen"))
|
||||
|
||||
// kitchenTimer.done.trigger {
|
||||
// _ => Hermes.say("The timer is done")
|
||||
// }
|
||||
|
||||
// Hermes.registerIntent("SetTimer"){ slots =>
|
||||
// val t = slots("count").num * slots("unit").num
|
||||
|
||||
// val (duration, _) = Seq(
|
||||
// (3600, "hour"),
|
||||
// (60, "minute"),
|
||||
// (1, "second")
|
||||
// ).foldLeft ( (Seq[String](), t.toLong) ) {
|
||||
// case ( (accum, remainder), (unit, label) ) =>
|
||||
// if(remainder > unit){
|
||||
// val diff = (remainder / unit).toLong
|
||||
// (
|
||||
// accum :+ s"$diff ${label}${if(diff > 1){ "s" } else { "" }}",
|
||||
// remainder % unit
|
||||
// )
|
||||
// } else {
|
||||
// (accum, remainder)
|
||||
// }
|
||||
// }
|
||||
|
||||
// Hermes.say(s"Setting timer for ${duration.mkString(", ")}")
|
||||
|
||||
// kitchenTimer.go(
|
||||
// end = java.time.LocalDateTime.now.plusSeconds(t.toLong)
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
|
||||
// Module("GARAGE_SENSORS") { implicit ctx =>
|
||||
// def notify(msg: String) =
|
||||
// {
|
||||
// Hermes.say(msg)
|
||||
// Hass.service("notify", "matrix_home", "message" -> msg)
|
||||
// }
|
||||
|
||||
// // Mqtt("sensor/garage/door")
|
||||
// // .onAnyChange
|
||||
// // .asString
|
||||
// // .trigger {
|
||||
// // // temporary bug in the sensor code flips the states...
|
||||
// // case "closed" => notify("The garage door is closed")
|
||||
// // case "open" => notify("The garage door is open")
|
||||
// // case _ => ()
|
||||
// // }
|
||||
|
||||
// Mqtt("sensor/garage/distance")
|
||||
// .asString
|
||||
// .map {
|
||||
// case "borked" => "unknown"
|
||||
// case x if x.toInt < 200 => "parked"
|
||||
// case x if x.toInt < 250 => "parking-near"
|
||||
// case x if x.toInt < 300 => "parking-far"
|
||||
// case _ => "empty"
|
||||
// }
|
||||
// .aliasAsString("sensor/garage/parking")
|
||||
// .onAnyChange
|
||||
// .asJson[String]
|
||||
// .debounce(5000)
|
||||
// .trigger {
|
||||
// case "parked" => notify("The car is parked")
|
||||
// case "empty" => notify("The car has departed")
|
||||
// case _ => ()
|
||||
// }
|
||||
// }
|
||||
|
||||
// Module("GARAGE_LIGHT"){ implicit ctx =>
|
||||
// val lights = Seq(
|
||||
// "red", "yellow", "green"
|
||||
// ).map { x => Mqtt(s"light/garage/tower/$x") }
|
||||
// def setLight(target: Int) =
|
||||
// for((light, idx) <- lights.zipWithIndex){
|
||||
// if(idx == target) { light << "on" }
|
||||
// else { light << "off" }
|
||||
// }
|
||||
// Mqtt("sensor/garage/parking")
|
||||
// .asString
|
||||
// .onAnyChange
|
||||
// .join(Mqtt("sensor/garage/door").onAnyChange.asString)
|
||||
// .trigger {
|
||||
// case ("parked" , "open") => setLight(0)
|
||||
// case ("parking-near", "open") => setLight(1)
|
||||
// case ("parking-far" , "open") => setLight(2)
|
||||
// case (_ , _) => setLight(3)
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
package net.okennedy.shingle
|
||||
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.cron.IntervalCronEvent
|
||||
import java.time.Duration
|
||||
import net.okennedy.shingle.cron.Cron
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
import scala.util.Success
|
||||
import scala.util.Failure
|
||||
|
||||
object Weather
|
||||
{
|
||||
val api =
|
||||
RestAPI(
|
||||
url = "https://api.weather.gov/gridpoints/BUF/39,52/forecast?units=us"
|
||||
)
|
||||
|
||||
val state = Mqtt("status/weather").isState.subscribe
|
||||
|
||||
def forecast =
|
||||
ujson.transform(
|
||||
state.json("properties")("periods"),
|
||||
reader[Seq[WeatherPrediction]]
|
||||
)
|
||||
|
||||
api.get match {
|
||||
case Success(r) => state << r
|
||||
case Failure(exception) => s"Error on initial weather fetch: ${exception}"
|
||||
}
|
||||
api.poll(state)(Owner.global)
|
||||
}
|
||||
case class WeatherPrediction(
|
||||
name: String,
|
||||
isDaytime: Boolean,
|
||||
temperature: Double,
|
||||
temperatureUnit: String,
|
||||
windSpeed: String,
|
||||
windDirection: String,
|
||||
icon: String,
|
||||
shortForecast: String,
|
||||
detailedForecast: String
|
||||
)
|
||||
object WeatherPrediction
|
||||
{
|
||||
implicit val rw: ReadWriter[WeatherPrediction] = macroRW[WeatherPrediction]
|
||||
}
|
|
@ -19,14 +19,26 @@ object Webserver extends cask.Routes
|
|||
Module.read(id)
|
||||
}
|
||||
|
||||
@cask.get("api/load/:id")
|
||||
@cask.get("api/modules/:id/load")
|
||||
def saveModule(id: String) = {
|
||||
Module.reload(id)
|
||||
try {
|
||||
Module.reload(id)
|
||||
"Success"
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
t.toString()
|
||||
}
|
||||
}
|
||||
|
||||
@cask.get("api/unload/:id")
|
||||
@cask.get("api/modules/:id/unload")
|
||||
def rmModule(id: String) = {
|
||||
Module.unload(id)
|
||||
try {
|
||||
Module.unload(id)
|
||||
"Success"
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
t.toString()
|
||||
}
|
||||
}
|
||||
|
||||
@cask.get("api/active")
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package net.okennedy.shingle.component
|
||||
|
||||
import net.okennedy.shingle.Shingle
|
||||
|
||||
trait Component
|
||||
{
|
||||
def name: String
|
||||
def depends: Set[String] = Set.empty
|
||||
def slug: String
|
||||
def setup(): Unit = {}
|
||||
|
||||
var __config: Option[ujson.Value] = None
|
||||
def config: ujson.Value =
|
||||
__config.getOrElse {
|
||||
throw new RuntimeException(s"Component $name: has not been initialized. Add a `$slug` field to the configuration file (${Shingle.configFile}")
|
||||
}
|
||||
|
||||
def init(newConfig: ujson.Value): Unit =
|
||||
{
|
||||
__config = Some(newConfig)
|
||||
setup()
|
||||
}
|
||||
}
|
||||
|
||||
object Component
|
||||
{
|
||||
val all = Seq[Component](
|
||||
Mqtt,
|
||||
Hass,
|
||||
Hermes,
|
||||
MPD,
|
||||
Weather,
|
||||
Notifications
|
||||
)
|
||||
|
||||
val byName = all.map { x => x.name -> x }.toMap
|
||||
|
||||
def apply(slug: String) =
|
||||
byName(slug)
|
||||
|
||||
}
|
|
@ -0,0 +1,173 @@
|
|||
package net.okennedy.shingle.component
|
||||
|
||||
import net.domlom.websocket._
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.stream.Checkpoint
|
||||
import scala.collection.concurrent
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
object Hass extends Component
|
||||
{
|
||||
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
|
||||
|
||||
def name = "Home Assistant"
|
||||
def slug = "hass"
|
||||
|
||||
lazy val host = config("host").str
|
||||
lazy val token = config("token").str
|
||||
|
||||
def post(path: String, body: ujson.Value): Unit =
|
||||
{
|
||||
println(s"POST: $host/api/$path")
|
||||
println(body.render())
|
||||
requests.post(
|
||||
s"$host/api/$path",
|
||||
data = body.render(),
|
||||
headers = Map(
|
||||
"Authorization" -> s"Bearer $token",
|
||||
"Content-Type" -> "application/json"
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def set(entity: String, state: String, attributes: (String, ujson.Value)*): Unit =
|
||||
{
|
||||
post(
|
||||
s"states/$entity",
|
||||
ujson.Obj(
|
||||
"state" -> state,
|
||||
"attributes" -> (
|
||||
if(attributes.isEmpty) { ujson.Obj() }
|
||||
else { ujson.Obj(attributes.head, attributes.tail:_*) }
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def service(domain: String, service: String, data: (String, ujson.Value)*): Unit =
|
||||
{
|
||||
post(
|
||||
s"services/$domain/$service",
|
||||
(
|
||||
if(data.isEmpty) { ujson.Obj() }
|
||||
else { ujson.Obj(data.head, data.tail:_*) }
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
def apply(entity: String): Checkpoint[String] =
|
||||
{
|
||||
socket // trigger the socket connection if needed
|
||||
entities.getOrElseUpdate(entity, new Checkpoint[String]("unknown"))
|
||||
}
|
||||
|
||||
private val entities = concurrent.TrieMap[String, Checkpoint[String]]()
|
||||
|
||||
|
||||
private var commandId: Long = 0
|
||||
|
||||
lazy val socket =
|
||||
{
|
||||
var url = s"$host/api/websocket"
|
||||
if(url.startsWith("http")){
|
||||
url = "ws"+url.drop(4)
|
||||
} else if(!url.startsWith("wss")) {
|
||||
url = "ws://"+url
|
||||
}
|
||||
println(s"Hass websocket ($url) initializing")
|
||||
val socket = Websocket(url,
|
||||
WebsocketBehavior(
|
||||
onOpen = { _ => println("Hass websocket open") },
|
||||
onMessage = { (_, message) =>
|
||||
try {
|
||||
val msg = ujson.read(message.value)
|
||||
// println(msg)
|
||||
msg("type").str match {
|
||||
case "auth_required" => login()
|
||||
case "auth_ok" => subscribe()
|
||||
case "auth_invalid" =>
|
||||
println("Hass login invalid. Disabling websocket.")
|
||||
close()
|
||||
case "event" =>
|
||||
val event = msg("event")
|
||||
event("event_type").str match {
|
||||
case "state_changed" =>
|
||||
val entity = event("data")("entity_id").str
|
||||
val state = event("data")("new_state")("state")
|
||||
// println(s"Hass: $entity <- $state")
|
||||
apply(entity).receive(state.str)
|
||||
}
|
||||
case "result" =>
|
||||
() // ignore me for now
|
||||
case _ =>
|
||||
println(s"Hass sent a message of an unknown type: $msg")
|
||||
}
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
println("Error while processing a Hass message:")
|
||||
t.printStackTrace()
|
||||
}
|
||||
},
|
||||
onClose = { reason =>
|
||||
println("Hass websocket disconnected. \n$reason\n ... Attempting reconnect in 10s")
|
||||
reconnect()
|
||||
},
|
||||
onError = { (_, err) =>
|
||||
println(err.getMessage)
|
||||
}
|
||||
),
|
||||
debugMode = false
|
||||
)
|
||||
socket.connect() match {
|
||||
case Success(msg) => println(s"Hass: $msg")
|
||||
case Failure(err) => println(s"Error connecting Hass Websocket ${err.getMessage()}");
|
||||
}
|
||||
/* return */ socket
|
||||
}
|
||||
|
||||
private def reconnect(delay: Duration = 10.seconds): Unit =
|
||||
{
|
||||
Future {
|
||||
Thread.sleep(delay.toMillis)
|
||||
socket.connect()
|
||||
}
|
||||
}
|
||||
|
||||
private def login(): Unit =
|
||||
{
|
||||
socket.sendSync(
|
||||
ujson.Obj(
|
||||
"type" -> "auth",
|
||||
"access_token" -> token
|
||||
).toString
|
||||
)
|
||||
}
|
||||
|
||||
private def close(): Unit =
|
||||
socket.close()
|
||||
|
||||
private def sendCommand(args: (String, ujson.Value)*): Unit =
|
||||
{
|
||||
commandId += 1
|
||||
socket.sendSync(
|
||||
ujson.Obj(
|
||||
"id" -> commandId,
|
||||
args:_*
|
||||
).toString
|
||||
)
|
||||
}
|
||||
|
||||
private def subscribe(): Unit =
|
||||
{
|
||||
println("Hass websocket subscribing to events")
|
||||
sendCommand(
|
||||
"type" -> ujson.Str("subscribe_events"),
|
||||
"event_type" -> ujson.Str("state_changed")
|
||||
)
|
||||
}
|
||||
|
||||
private case class Login(api_password: String, `type`:String = "auth")
|
||||
}
|
|
@ -1,13 +1,17 @@
|
|||
package net.okennedy.shingle
|
||||
package net.okennedy.shingle.component
|
||||
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
|
||||
object Hermes
|
||||
object Hermes extends Component
|
||||
{
|
||||
|
||||
def name = "Hermes Voice Assistant Integration"
|
||||
def slug = "hermes"
|
||||
override def depends = Set(Mqtt.slug)
|
||||
|
||||
lazy val defaults =
|
||||
Shingle.config("hermes")("default")
|
||||
config("default")
|
||||
|
||||
lazy val defaultSiteId =
|
||||
defaults("siteId").str
|
|
@ -0,0 +1,161 @@
|
|||
package net.okennedy.shingle.component
|
||||
|
||||
import org.bff.javampd.file.MPDFile
|
||||
import org.bff.javampd.server.{ MPD => MPDServer }
|
||||
import collection.JavaConverters.collectionAsScalaIterableConverter
|
||||
import org.bff.javampd.song.MPDSong
|
||||
import org.bff.javampd.output.OutputChangeListener
|
||||
import org.bff.javampd.output.OutputChangeEvent
|
||||
import org.bff.javampd.player.PlayerChangeListener
|
||||
import org.bff.javampd.player.PlayerChangeEvent
|
||||
import org.bff.javampd.player.PlayerBasicChangeListener
|
||||
import org.bff.javampd.player.PlayerBasicChangeEvent
|
||||
import org.bff.javampd.playlist.PlaylistBasicChangeListener
|
||||
import org.bff.javampd.playlist.PlaylistBasicChangeEvent
|
||||
import org.bff.javampd.server.ConnectionChangeListener
|
||||
import org.bff.javampd.server.ConnectionChangeEvent
|
||||
import org.bff.javampd.player.TrackPositionChangeListener
|
||||
import org.bff.javampd.player.TrackPositionChangeEvent
|
||||
import net.okennedy.shingle.stream.Checkpoint
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
import upickle.default._
|
||||
|
||||
object MPD
|
||||
extends Component
|
||||
{
|
||||
def name = "Music Player Daemon"
|
||||
def slug = "mpd"
|
||||
override def depends = Set(Mqtt.slug)
|
||||
|
||||
lazy val instance =
|
||||
MPDServer.Builder()
|
||||
.server(config("host").str)
|
||||
.build()
|
||||
lazy val topic =
|
||||
config("topic").str
|
||||
|
||||
lazy val db =
|
||||
instance.getMusicDatabase()
|
||||
|
||||
def apply(path: String): Option[MPDFile] =
|
||||
{
|
||||
var curr: Option[MPDFile] = None
|
||||
var dir: Iterable[MPDFile] = Seq.empty
|
||||
for(i <- path.split("/")){
|
||||
if(i != ""){
|
||||
dir = curr match {
|
||||
case None => db.getFileDatabase.listRootDirectory.asScala
|
||||
case Some(f) => {
|
||||
if(!f.isDirectory){ return None }
|
||||
db.getFileDatabase.listDirectory(f).asScala
|
||||
}
|
||||
}
|
||||
curr = dir.find { _.getPath.split("/").last == i }
|
||||
if(curr.isEmpty){ return None }
|
||||
}
|
||||
}
|
||||
return curr
|
||||
}
|
||||
|
||||
def savedPlaylists: Seq[String] =
|
||||
db.getPlaylistDatabase.listPlaylists.asScala.toSeq
|
||||
|
||||
def savedPlaylist(name: String): Seq[MPDSong] =
|
||||
db.getPlaylistDatabase.listPlaylistSongs(name).asScala.toSeq
|
||||
|
||||
def list(path: String): Seq[MPDFile] =
|
||||
apply(path).toSeq.flatMap { db.getFileDatabase.listDirectory(_).asScala.toSeq }
|
||||
|
||||
def append(path: String): Boolean =
|
||||
{
|
||||
val file = apply(path).getOrElse { return false }
|
||||
instance.getPlaylist.addSong(file.getPath)
|
||||
return true
|
||||
}
|
||||
|
||||
def play(): Unit =
|
||||
instance.getPlayer.play()
|
||||
|
||||
def clear(): Unit =
|
||||
instance.getPlaylist.clearPlaylist()
|
||||
|
||||
case class Song(title: String, artist: String, album: String, position: Int)
|
||||
object Song
|
||||
{
|
||||
def apply(song: MPDSong): Song =
|
||||
Song(
|
||||
title = song.getTitle,
|
||||
artist = song.getArtistName,
|
||||
album = song.getAlbumName,
|
||||
position = song.getPosition
|
||||
)
|
||||
implicit val rw: ReadWriter[Song] = macroRW[Song]
|
||||
}
|
||||
|
||||
val status = new Checkpoint[String]("paused")
|
||||
val playlist = new Checkpoint[Seq[Song]](Seq.empty)
|
||||
val currentTrack = new Checkpoint[Option[Song]](None)
|
||||
val position = new Checkpoint[Long](0)
|
||||
|
||||
override def setup(): Unit =
|
||||
{
|
||||
val bridge = config.obj.get("bridge").map { _.bool }.getOrElse(false)
|
||||
val useProxy = config.obj.get("use_proxy").map { _.bool }.getOrElse(false)
|
||||
implicit val owner: Owner = Owner.global
|
||||
|
||||
if(bridge && !useProxy)
|
||||
{
|
||||
Mqtt(s"$topic/status") << status
|
||||
Mqtt(s"$topic/playlist") << playlist
|
||||
Mqtt(s"$topic/now_playing") << currentTrack
|
||||
Mqtt(s"$topic/position") << position
|
||||
}
|
||||
if(useProxy)
|
||||
{
|
||||
status << Mqtt(s"$topic/status").asString
|
||||
playlist << Mqtt(s"$topic/playlist").asJson[Seq[Song]]
|
||||
currentTrack << Mqtt(s"$topic/now_playing").asJson[Option[Song]]
|
||||
position << Mqtt(s"$topic/position").asLong
|
||||
} else
|
||||
{
|
||||
val monitor = instance.getMonitor
|
||||
monitor.addPlayerChangeListener(new PlayerBasicChangeListener(){
|
||||
def playerBasicChange(evt: PlayerBasicChangeEvent) =
|
||||
status << (
|
||||
evt.getStatus match {
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_PAUSED => "pause"
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_STARTED => "play"
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_STOPPED => "pause"
|
||||
case PlayerBasicChangeEvent.Status.PLAYER_UNPAUSED => "play"
|
||||
}
|
||||
)
|
||||
})
|
||||
monitor.addPlaylistChangeListener(new PlaylistBasicChangeListener(){
|
||||
def playlistBasicChange(evt: PlaylistBasicChangeEvent) =
|
||||
evt.getEvent match {
|
||||
case PlaylistBasicChangeEvent.Event.PLAYLIST_CHANGED
|
||||
| PlaylistBasicChangeEvent.Event.PLAYLIST_ENDED =>
|
||||
playlist << (
|
||||
instance.getPlaylist.getSongList.asScala.toSeq.map {
|
||||
Song(_)
|
||||
}
|
||||
)
|
||||
currentTrack << None
|
||||
case PlaylistBasicChangeEvent.Event.SONG_CHANGED =>
|
||||
currentTrack << (
|
||||
Option(instance.getPlaylist.getCurrentSong)
|
||||
.map { Song(_) }
|
||||
)
|
||||
case _ => ()
|
||||
}
|
||||
})
|
||||
|
||||
monitor.addTrackPositionChangeListener(new TrackPositionChangeListener(){
|
||||
def trackPositionChanged(evt: TrackPositionChangeEvent) =
|
||||
position << evt.getElapsedTime
|
||||
})
|
||||
|
||||
monitor.start()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package net.okennedy.shingle
|
||||
package net.okennedy.shingle.component
|
||||
|
||||
import org.fusesource.mqtt.{ client => fuse }
|
||||
import scala.collection.mutable
|
||||
|
@ -8,13 +8,16 @@ import java.util.concurrent.TimeUnit
|
|||
import org.fusesource.mqtt.client.Tracer
|
||||
import org.fusesource.mqtt.codec.MQTTFrame
|
||||
import com.typesafe.scalalogging.LazyLogging
|
||||
import net.okennedy.shingle.Shingle
|
||||
|
||||
object Mqtt
|
||||
extends Thread
|
||||
with Component
|
||||
with LazyLogging
|
||||
{
|
||||
lazy val config = Shingle.config("mqtt")
|
||||
def host = config("host").str
|
||||
def name = "MQTT Bus"
|
||||
def slug = "mqtt"
|
||||
lazy val host = config("host").str
|
||||
|
||||
lazy val mqtt = {
|
||||
val mqtt = new fuse.MQTT()
|
||||
|
@ -154,6 +157,8 @@ class MqttPath(val fullPath: String, parent: Option[MqttPath])
|
|||
<<(value.toString.getBytes)
|
||||
def <<[T: Writer](value: T): Unit =
|
||||
<<(writeJs(value))
|
||||
def <<[T: Writer](stream: Stream[T])(implicit owner: Owner): Unit =
|
||||
stream.trigger { this << _ }
|
||||
|
||||
def apply(pathStep: String): MqttPath =
|
||||
contents.getOrElseUpdate(
|
|
@ -1,12 +1,14 @@
|
|||
package net.okennedy.shingle.module
|
||||
package net.okennedy.shingle.component
|
||||
|
||||
import net.okennedy.shingle.Shingle
|
||||
import net.okennedy.shingle.Mqtt
|
||||
import net.okennedy.shingle.stream.{Stream, Owner}
|
||||
import net.okennedy.shingle.stream.{Stream, Sink, Owner}
|
||||
|
||||
object Notifications
|
||||
object Notifications extends Component
|
||||
{
|
||||
lazy val config = Shingle.config("notification").obj
|
||||
def name = "Notification System"
|
||||
def slug = "notification"
|
||||
override val depends = Set(Mqtt.slug)
|
||||
|
||||
lazy val topic =
|
||||
config("topic").str
|
||||
lazy val root =
|
||||
|
@ -29,23 +31,26 @@ object Notifications
|
|||
def label = "info"
|
||||
}
|
||||
|
||||
class Heading(id: String)
|
||||
class Heading(id: String)(implicit owner: Owner) extends Sink[Option[Notification]]
|
||||
{
|
||||
val target = root(id).isState
|
||||
val last: Option[Notification] = null
|
||||
|
||||
def <<(msg: Stream[Option[Notification]])(implicit owner: Owner) =
|
||||
def receive(n: Option[Notification]) =
|
||||
{
|
||||
msg.onAnyChangeIncludingFirst
|
||||
.trigger {
|
||||
case None =>
|
||||
target << ""
|
||||
case Some( notification ) =>
|
||||
target << notification.json
|
||||
}
|
||||
if(last == null || n != last)
|
||||
{
|
||||
n match {
|
||||
case None =>
|
||||
target << ""
|
||||
case Some( notification ) =>
|
||||
target << notification.json
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def apply(id: String) = new Heading(id)
|
||||
def apply(id: String)(implicit owner: Owner) = new Heading(id)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
package net.okennedy.shingle.component
|
||||
|
||||
import net.okennedy.shingle.Shingle
|
||||
import org.fusesource.mqtt.client.QoS
|
||||
import scala.util.Try
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
import net.okennedy.shingle.cron.IntervalCronEvent
|
||||
import net.okennedy.shingle.cron.Cron
|
||||
import scala.concurrent.duration._
|
||||
import requests.RequestFailedException
|
||||
import net.okennedy.shingle.stream._
|
||||
|
||||
case class RestAPI(
|
||||
val url: String,
|
||||
val headers: Iterable[(String, String)] = Seq.empty,
|
||||
)
|
||||
{
|
||||
def get: Try[String] =
|
||||
{
|
||||
println(s"Fetching $url")
|
||||
try {
|
||||
val result = requests.get(
|
||||
url = url,
|
||||
headers = headers,
|
||||
)
|
||||
|
||||
if(result.statusCode == 200){ Success(result.string()) }
|
||||
else { Failure(new Exception(s"Result ${result.statusCode} when fetching $url")) }
|
||||
} catch {
|
||||
case e: RequestFailedException => Failure(e)
|
||||
}
|
||||
}
|
||||
|
||||
def poll(interval: FiniteDuration = 1.hour)(implicit owner: Owner): StringStream =
|
||||
Timer.every(interval)
|
||||
.flatMap { _ => print("Poll!"); get match {
|
||||
case Success(r) =>
|
||||
Some(r)
|
||||
case Failure(err) =>
|
||||
println(s"Error fetching $url: $err");
|
||||
None
|
||||
} }
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package net.okennedy.shingle
|
||||
package net.okennedy.shingle.component
|
||||
|
||||
import java.time.LocalDateTime
|
||||
import java.time.temporal.ChronoUnit
|
|
@ -0,0 +1,57 @@
|
|||
package net.okennedy.shingle.component
|
||||
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.cron.IntervalCronEvent
|
||||
import java.time.Duration
|
||||
import net.okennedy.shingle.cron.Cron
|
||||
import net.okennedy.shingle.stream._
|
||||
import scala.util.Success
|
||||
import scala.util.Failure
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object Weather extends Component
|
||||
{
|
||||
def name = "Weather.gov Integration"
|
||||
def slug = "weather"
|
||||
|
||||
lazy val api =
|
||||
RestAPI(url = config("url").str)
|
||||
|
||||
lazy val state: Checkpoint[ujson.Value] =
|
||||
{
|
||||
val state =
|
||||
api.poll(1.hour)(Owner.global)
|
||||
.asJson[ujson.Value]
|
||||
.asState(Owner.global)
|
||||
try {
|
||||
state.receive(ujson.read(api.get.get))
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
println(s"Error initializing Weather. Will try again in 1 hour.\n${t.getMessage}")
|
||||
}
|
||||
state
|
||||
}
|
||||
|
||||
|
||||
def forecast =
|
||||
ujson.transform(
|
||||
state.now("properties")("periods"),
|
||||
reader[Seq[WeatherPrediction]]
|
||||
)
|
||||
}
|
||||
|
||||
case class WeatherPrediction(
|
||||
name: String,
|
||||
isDaytime: Boolean,
|
||||
temperature: Double,
|
||||
temperatureUnit: String,
|
||||
windSpeed: String,
|
||||
windDirection: String,
|
||||
icon: String,
|
||||
shortForecast: String,
|
||||
detailedForecast: String
|
||||
)
|
||||
object WeatherPrediction
|
||||
{
|
||||
implicit val rw: ReadWriter[WeatherPrediction] = macroRW[WeatherPrediction]
|
||||
}
|
|
@ -85,9 +85,8 @@ object Module
|
|||
engine.eval(
|
||||
s"""net.okennedy.shingle.module.Module("$module") { implicit ctx =>
|
||||
| import net.okennedy.shingle._;
|
||||
| import net.okennedy.shingle.component._;
|
||||
| import net.okennedy.shingle.cron.Cron;
|
||||
| import net.okennedy.shingle.module.Notification;
|
||||
| import net.okennedy.shingle.module.Notifications;
|
||||
| import scala.concurrent.duration._;
|
||||
| $script
|
||||
|}
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package net.okennedy.shingle.stream
|
||||
|
||||
trait Sink[T]
|
||||
{
|
||||
def receive(value: T): Unit
|
||||
|
||||
def <<(source: Stream[T])(implicit owner: Owner): Sink[T] =
|
||||
{
|
||||
source.trigger { receive(_) }; this
|
||||
}
|
||||
|
||||
def <<(value: T): Sink[T] =
|
||||
{
|
||||
receive(value); this
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package net.okennedy.shingle.stream
|
||||
|
||||
trait State[T]
|
||||
{
|
||||
def now: T
|
||||
}
|
||||
|
||||
class Checkpoint[T](init: Option[T] = None) extends State[T] with Sink[T] with Dispatchable[T]
|
||||
{
|
||||
private var __value: Option[T] = init
|
||||
|
||||
def this(init: T) = this(Some(init))
|
||||
|
||||
def now:T =
|
||||
__value.getOrElse {
|
||||
throw new RuntimeException("Checkpoint hasn't received a value yet")
|
||||
}
|
||||
|
||||
def receive(incoming: T): Unit =
|
||||
{
|
||||
__value = Some(incoming)
|
||||
dispatch(incoming)
|
||||
}
|
||||
}
|
|
@ -4,9 +4,6 @@ import scala.collection.mutable
|
|||
import upickle.default._
|
||||
import ujson.Bool
|
||||
import java.util.Arrays
|
||||
import net.okennedy.shingle.MqttPath
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.Mqtt
|
||||
|
||||
trait Stream[T]
|
||||
{
|
||||
|
@ -24,6 +21,19 @@ trait Stream[T]
|
|||
}
|
||||
}
|
||||
|
||||
def flatMap[T2](f: T => Iterable[T2]): Stream[T2] =
|
||||
{
|
||||
def baseTrigger(handler: T => Unit, owner: Owner) = trigger(handler)(owner)
|
||||
new Stream[T2] {
|
||||
def trigger(handler: T2 => Unit)(implicit owner: Owner): Unit =
|
||||
{
|
||||
baseTrigger({
|
||||
value => f(value).foreach { handler(_) }
|
||||
}, owner)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def filter(f: T => Boolean): Stream[T] =
|
||||
{
|
||||
def baseTrigger(handler: T => Unit, owner: Owner) = trigger(handler)(owner)
|
||||
|
@ -62,23 +72,16 @@ trait Stream[T]
|
|||
def onAnyChangeIncludingFirst: TriggerableOnChange[T] =
|
||||
new TriggerableOnChange(this, _ equals _, false) { }
|
||||
|
||||
def aliasAsJson(label: String)(implicit owner: Owner, writes: Writer[T]): MqttPath =
|
||||
{
|
||||
val ret = Mqtt(label)
|
||||
trigger { ret << _ }
|
||||
ret
|
||||
}
|
||||
def uniq = onAnyChange
|
||||
|
||||
def aliasAsString(label: String)(implicit owner: Owner): MqttPath =
|
||||
def asState(implicit owner: Owner): Checkpoint[T] =
|
||||
{
|
||||
val ret = Mqtt(label)
|
||||
trigger { ret << _.toString }
|
||||
ret
|
||||
val ret = new Checkpoint[T]()
|
||||
ret << this
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
trait StreamMap[T1,T2] extends Stream[T2]
|
||||
|
||||
trait Dispatchable[T] extends Stream[T]
|
||||
{
|
||||
private val triggers = mutable.HashMap[String, Trigger[T]]()
|
||||
|
@ -131,6 +134,7 @@ trait ByteArrayStream extends Stream[Array[Byte]]
|
|||
{
|
||||
def asString = map { new String(_) }
|
||||
def asInt = map { new String(_).toInt }
|
||||
def asLong = map { new String(_).toLong }
|
||||
def asJson[T](implicit reader: Reader[T]) = map { ujson.transform(_, reader) }
|
||||
|
||||
override def onAnyChange: ByteArrayStream with TriggerableOnChange[Array[Byte]] =
|
||||
|
@ -139,5 +143,28 @@ trait ByteArrayStream extends Stream[Array[Byte]]
|
|||
override def onAnyChangeIncludingFirst: ByteArrayStream with TriggerableOnChange[Array[Byte]] =
|
||||
new ByteArrayStream with TriggerableOnChange[Array[Byte]](this, Arrays.equals(_, _), false)
|
||||
}
|
||||
implicit class ConvertedByteArrayStream(source: Stream[Array[Byte]]) extends ByteArrayStream
|
||||
{
|
||||
def trigger(handler: Array[Byte] => Unit)(implicit owner: Owner): Unit =
|
||||
source.trigger(handler)(owner)
|
||||
}
|
||||
|
||||
trait StringStream extends Stream[String]
|
||||
{
|
||||
def asInt = map { _.toInt }
|
||||
def asLong = map { _.toLong }
|
||||
def asJson[T](implicit reader: Reader[T]) = map { ujson.transform(_, reader) }
|
||||
|
||||
override def onAnyChange: StringStream with TriggerableOnChange[String] =
|
||||
new StringStream with TriggerableOnChange[String](this, _ == _)
|
||||
|
||||
override def onAnyChangeIncludingFirst: StringStream with TriggerableOnChange[String] =
|
||||
new StringStream with TriggerableOnChange[String](this, _ == _, false)
|
||||
}
|
||||
implicit class ConvertedStringStream(source: Stream[String]) extends StringStream
|
||||
{
|
||||
def trigger(handler: String => Unit)(implicit owner: Owner): Unit =
|
||||
source.trigger(handler)(owner)
|
||||
}
|
||||
|
||||
class Trigger[T](val handler: T => Unit, val owner: Owner)
|
||||
|
|
Loading…
Reference in New Issue