Compare commits

...

4 Commits

Author SHA1 Message Date
Oliver Kennedy e68617b470
Much cleanup, and Hass integration
- MPD is now a bit more sensible... it doesn't auto-bridge to MQTT, only by request.
- Java SE websockets are fail.  Found another websocket client based on Eclipse stuffs and it seems to be working well enough.
- Refactored the components into a new components package.
- Component initialization is now predicated on the appropriate token appearing in the config file.
- Added implicit stream conversion to string and byte array streams
- Hass websockets live!
2023-06-04 01:50:50 -04:00
Oliver Kennedy 2135576901
Merge branch 'main' of heracles.local:okennedy/Shingle 2023-06-03 18:32:23 -04:00
Oliver Kennedy c5a82e13f6
Merge remote-tracking branch 'origin/main' 2023-05-31 22:47:47 -04:00
Oliver Kennedy e98314d488
Bugfixes, aliases, and standardization
- RestAPI now handles failures gracefully
- Webserver now uses more standard paths for module load and unload
- Added uniq and mqtTee aliases for existing functions
2023-05-31 21:39:01 -04:00
21 changed files with 671 additions and 418 deletions

View File

@ -106,6 +106,26 @@ Utilities for working with REST APIs.
* `RestAPI(url).poll(topic, interval = 1.hour)`: Poll the provided URL every interval and post the result to the specified MQTT topic.
* `RestAPI(url).get`: Retrieve the current contents of the GET url as a Scala `Try[String]`
#### MPD
The [music player daemon](https://www.musicpd.org/). To use the MPD integration, you need to provide access information in your config file:
```
"mpd" : {
"host" : "localhost",
}
```
* `MPD.status`: A stream of the MPD status
* `MPD.playlist`: A stream of MPD's current playlist
* `MPD.currentTrack`: A stream of the currently playing song
* `MPD.position`: A stream of the current timestamp in the song
* `MPD.play()`: Start playing the current playlist
* `MPD.clear()`: Clear the current playlist
* `MPD.list(path)`: List the MPD-local files at the specified path.
* `MPD.savedPlaylists`: List all of the MPD saved playlists
* `MPD.savedPlaylist(name)`: List all songs in the specified saved playlist
* `MPD.append(path)`: Add the song at the specified path to the end of the playlist
## Compiling Shingle
#### Setup

View File

@ -20,7 +20,8 @@ object shingle extends ScalaModule
ivy"org.slf4j:slf4j-api:1.7.36",
ivy"ch.qos.logback:logback-classic:1.2.10",
ivy"com.typesafe.scala-logging::scala-logging::3.9.4",
ivy"org.scala-lang::scala3-compiler::3.1.2"
ivy"org.scala-lang::scala3-compiler::3.1.2",
ivy"net.domlom::websocket-scala:0.0.3".withDottyCompat(scalaVersion()),
)
}

View File

@ -1,47 +0,0 @@
package net.okennedy.shingle
object Hass
{
val config = Shingle.config("hass").obj
val host = config("host").str
val token = config("token").str
def post(path: String, body: ujson.Value): Unit =
{
println(s"POST: $host/api/$path")
println(body.render())
requests.post(
s"$host/api/$path",
data = body.render(),
headers = Map(
"Authorization" -> s"Bearer $token",
"Content-Type" -> "application/json"
)
)
}
def set(entity: String, state: String, attributes: (String, ujson.Value)*): Unit =
{
post(
s"states/$entity",
ujson.Obj(
"state" -> state,
"attributes" -> (
if(attributes.isEmpty) { ujson.Obj() }
else { ujson.Obj(attributes.head, attributes.tail:_*) }
)
)
)
}
def service(domain: String, service: String, data: (String, ujson.Value)*): Unit =
{
post(
s"services/$domain/$service",
(
if(data.isEmpty) { ujson.Obj() }
else { ujson.Obj(data.head, data.tail:_*) }
)
)
}
}

View File

@ -1,132 +0,0 @@
package net.okennedy.shingle
import org.bff.javampd.file.MPDFile
import org.bff.javampd.server.{ MPD => MPDServer }
import collection.JavaConverters.collectionAsScalaIterableConverter
import org.bff.javampd.song.MPDSong
import org.bff.javampd.output.OutputChangeListener
import org.bff.javampd.output.OutputChangeEvent
import org.bff.javampd.player.PlayerChangeListener
import org.bff.javampd.player.PlayerChangeEvent
import org.bff.javampd.player.PlayerBasicChangeListener
import org.bff.javampd.player.PlayerBasicChangeEvent
import org.bff.javampd.playlist.PlaylistBasicChangeListener
import org.bff.javampd.playlist.PlaylistBasicChangeEvent
import org.bff.javampd.server.ConnectionChangeListener
import org.bff.javampd.server.ConnectionChangeEvent
import org.bff.javampd.player.TrackPositionChangeListener
import org.bff.javampd.player.TrackPositionChangeEvent
object MPD
{
def config = Shingle.config("mpd").obj
lazy val instance =
MPDServer.Builder()
.server(config("host").str)
.build()
lazy val topic =
config("topic").str
lazy val db =
instance.getMusicDatabase()
def apply(path: String): Option[MPDFile] =
{
var curr: Option[MPDFile] = None
var dir: Iterable[MPDFile] = Seq.empty
for(i <- path.split("/")){
if(i != ""){
dir = curr match {
case None => db.getFileDatabase.listRootDirectory.asScala
case Some(f) => {
if(!f.isDirectory){ return None }
db.getFileDatabase.listDirectory(f).asScala
}
}
curr = dir.find { _.getPath.split("/").last == i }
if(curr.isEmpty){ return None }
}
}
return curr
}
def savedPlaylists: Seq[String] =
db.getPlaylistDatabase.listPlaylists.asScala.toSeq
def savedPlaylist(name: String): Seq[MPDSong] =
db.getPlaylistDatabase.listPlaylistSongs(name).asScala.toSeq
def list(path: String): Seq[MPDFile] =
apply(path).toSeq.flatMap { db.getFileDatabase.listDirectory(_).asScala.toSeq }
def append(path: String): Boolean =
{
val file = apply(path).getOrElse { return false }
instance.getPlaylist.addSong(file.getPath)
return true
}
def play(): Unit =
instance.getPlayer.play()
def clear(): Unit =
instance.getPlaylist.clearPlaylist()
implicit def songToJson(song: MPDSong): ujson.Obj =
ujson.Obj(
"title" -> song.getTitle,
"artist" -> song.getArtistName,
"album" -> song.getAlbumName,
"position" -> song.getPosition
)
val status = Mqtt(s"$topic/status").isState
val playlist = Mqtt(s"$topic/playlist").isState
val currentTrack = Mqtt(s"$topic/now_playing").isState
val position = Mqtt(s"$topic/position").isState
if(config.get("proxy").map { _.bool }.getOrElse(true)){
status.subscribe
playlist.subscribe
currentTrack.subscribe
position.subscribe
} else {
val monitor = instance.getMonitor
monitor.addPlayerChangeListener(new PlayerBasicChangeListener(){
def playerBasicChange(evt: PlayerBasicChangeEvent) =
status << (
evt.getStatus match {
case PlayerBasicChangeEvent.Status.PLAYER_PAUSED => "pause"
case PlayerBasicChangeEvent.Status.PLAYER_STARTED => "play"
case PlayerBasicChangeEvent.Status.PLAYER_STOPPED => "pause"
case PlayerBasicChangeEvent.Status.PLAYER_UNPAUSED => "play"
}
)
})
monitor.addPlaylistChangeListener(new PlaylistBasicChangeListener(){
def playlistBasicChange(evt: PlaylistBasicChangeEvent) =
evt.getEvent match {
case PlaylistBasicChangeEvent.Event.PLAYLIST_CHANGED
| PlaylistBasicChangeEvent.Event.PLAYLIST_ENDED =>
playlist << (
ujson.Arr.from(
instance.getPlaylist.getSongList.asScala.toSeq.map { songToJson(_) }
)
)
case PlaylistBasicChangeEvent.Event.SONG_CHANGED =>
currentTrack << (
songToJson(instance.getPlaylist.getCurrentSong)
)
case _ => ()
}
})
monitor.addTrackPositionChangeListener(new TrackPositionChangeListener(){
def trackPositionChanged(evt: TrackPositionChangeEvent) =
position << evt.getElapsedTime
})
monitor.start()
}
}

View File

@ -1,41 +0,0 @@
package net.okennedy.shingle
import net.okennedy.shingle.Shingle
import org.fusesource.mqtt.client.QoS
import scala.util.Try
import scala.util.Failure
import scala.util.Success
import net.okennedy.shingle.stream.Owner
import net.okennedy.shingle.cron.IntervalCronEvent
import net.okennedy.shingle.cron.Cron
import scala.concurrent.duration._
case class RestAPI(
val url: String,
val headers: Iterable[(String, String)] = Seq.empty,
)
{
def get: Try[String] =
{
println(s"Fetching $url")
val result = requests.get(
url = url,
headers = headers,
)
if(result.statusCode == 200){ Success(result.string()) }
else { Failure(new Exception(s"Result ${result.statusCode} when fetching $url")) }
}
def poll(topic: MqttPath, interval: FiniteDuration = 1.hour)(implicit owner: Owner) =
Cron.add(
new IntervalCronEvent(interval) {
// override def immediate = true
def trigger() =
get match {
case Success(r) => topic << r
case Failure(err) => println(s"Error fetching $url: $err")
}
}
)
}

View File

@ -6,6 +6,9 @@ import net.okennedy.shingle.module.Module
import java.util.Calendar
import cask.main.Routes
import net.okennedy.shingle.cron.Cron
import net.okennedy.shingle.component.Component
import java.io.FileNotFoundException
import scala.collection.mutable
object Shingle
extends cask.Main
@ -14,123 +17,49 @@ object Shingle
override def allRoutes: Seq[Routes] = Seq(Webserver)
override def port: Int = 4000
val config = ujson.read(Source.fromFile(
System.getProperty("user.home") + File.separator + ".shingle" + File.separator + "config.json"
).getLines.mkString("\n"))
val configFile = System.getProperty("user.home") + File.separator + ".shingle" + File.separator + "config.json"
val config =
try {
ujson.read(Source.fromFile(configFile)
.getLines
.mkString("\n"))
.obj
} catch {
case f: FileNotFoundException =>
throw new RuntimeException(
s"No shingle config file at $configFile"
)
}
override def main(args: Array[String]): Unit =
{
println("Welcome to Shingle\nInitializing libraries...")
println("Welcome to Shingle\nInitializing components...")
println("...Mqtt")
// Mqtt.start()
println("...MPD")
// MPD
println("...Hass")
// Hass
println("...Weather")
// Weather
println("...Cron")
val active = mutable.Set[String]()
for(component <- Component.all){
config.get(component.slug)
.map { componentConfig =>
val missingDeps = component.depends -- active
if(missingDeps.isEmpty)
{
component.init(componentConfig)
active.add(component.slug)
println(s" ... initialized ${component.name}")
} else {
println(s" ... skipping ${component.name}; Missing dependencies on ${missingDeps.map { Component(_).name }.mkString(", ")}")
}
}.getOrElse {
println(s" ... Missing configuration for ${component.name}")
}
}
println("Initializing timer thread...")
Cron.handler.start()
println("Installing user modules...")
Module.init()
println("Shingle is running.")
println(s" ... admin interface on http://localhost:${port}/api")
super.main(args)
// Module("TIME_INTENTS") { implicit ctx =>
// var kitchenTimer: Timer = new Timer(Mqtt("timer/kitchen"))
// kitchenTimer.done.trigger {
// _ => Hermes.say("The timer is done")
// }
// Hermes.registerIntent("SetTimer"){ slots =>
// val t = slots("count").num * slots("unit").num
// val (duration, _) = Seq(
// (3600, "hour"),
// (60, "minute"),
// (1, "second")
// ).foldLeft ( (Seq[String](), t.toLong) ) {
// case ( (accum, remainder), (unit, label) ) =>
// if(remainder > unit){
// val diff = (remainder / unit).toLong
// (
// accum :+ s"$diff ${label}${if(diff > 1){ "s" } else { "" }}",
// remainder % unit
// )
// } else {
// (accum, remainder)
// }
// }
// Hermes.say(s"Setting timer for ${duration.mkString(", ")}")
// kitchenTimer.go(
// end = java.time.LocalDateTime.now.plusSeconds(t.toLong)
// )
// }
// }
// Module("GARAGE_SENSORS") { implicit ctx =>
// def notify(msg: String) =
// {
// Hermes.say(msg)
// Hass.service("notify", "matrix_home", "message" -> msg)
// }
// // Mqtt("sensor/garage/door")
// // .onAnyChange
// // .asString
// // .trigger {
// // // temporary bug in the sensor code flips the states...
// // case "closed" => notify("The garage door is closed")
// // case "open" => notify("The garage door is open")
// // case _ => ()
// // }
// Mqtt("sensor/garage/distance")
// .asString
// .map {
// case "borked" => "unknown"
// case x if x.toInt < 200 => "parked"
// case x if x.toInt < 250 => "parking-near"
// case x if x.toInt < 300 => "parking-far"
// case _ => "empty"
// }
// .aliasAsString("sensor/garage/parking")
// .onAnyChange
// .asJson[String]
// .debounce(5000)
// .trigger {
// case "parked" => notify("The car is parked")
// case "empty" => notify("The car has departed")
// case _ => ()
// }
// }
// Module("GARAGE_LIGHT"){ implicit ctx =>
// val lights = Seq(
// "red", "yellow", "green"
// ).map { x => Mqtt(s"light/garage/tower/$x") }
// def setLight(target: Int) =
// for((light, idx) <- lights.zipWithIndex){
// if(idx == target) { light << "on" }
// else { light << "off" }
// }
// Mqtt("sensor/garage/parking")
// .asString
// .onAnyChange
// .join(Mqtt("sensor/garage/door").onAnyChange.asString)
// .trigger {
// case ("parked" , "open") => setLight(0)
// case ("parking-near", "open") => setLight(1)
// case ("parking-far" , "open") => setLight(2)
// case (_ , _) => setLight(3)
// }
// }
}
}

View File

@ -1,46 +0,0 @@
package net.okennedy.shingle
import upickle.default._
import net.okennedy.shingle.cron.IntervalCronEvent
import java.time.Duration
import net.okennedy.shingle.cron.Cron
import net.okennedy.shingle.stream.Owner
import scala.util.Success
import scala.util.Failure
object Weather
{
val api =
RestAPI(
url = "https://api.weather.gov/gridpoints/BUF/39,52/forecast?units=us"
)
val state = Mqtt("status/weather").isState.subscribe
def forecast =
ujson.transform(
state.json("properties")("periods"),
reader[Seq[WeatherPrediction]]
)
api.get match {
case Success(r) => state << r
case Failure(exception) => s"Error on initial weather fetch: ${exception}"
}
api.poll(state)(Owner.global)
}
case class WeatherPrediction(
name: String,
isDaytime: Boolean,
temperature: Double,
temperatureUnit: String,
windSpeed: String,
windDirection: String,
icon: String,
shortForecast: String,
detailedForecast: String
)
object WeatherPrediction
{
implicit val rw: ReadWriter[WeatherPrediction] = macroRW[WeatherPrediction]
}

View File

@ -19,14 +19,26 @@ object Webserver extends cask.Routes
Module.read(id)
}
@cask.get("api/load/:id")
@cask.get("api/modules/:id/load")
def saveModule(id: String) = {
Module.reload(id)
try {
Module.reload(id)
"Success"
} catch {
case t: Throwable =>
t.toString()
}
}
@cask.get("api/unload/:id")
@cask.get("api/modules/:id/unload")
def rmModule(id: String) = {
Module.unload(id)
try {
Module.unload(id)
"Success"
} catch {
case t: Throwable =>
t.toString()
}
}
@cask.get("api/active")

View File

@ -0,0 +1,41 @@
package net.okennedy.shingle.component
import net.okennedy.shingle.Shingle
trait Component
{
def name: String
def depends: Set[String] = Set.empty
def slug: String
def setup(): Unit = {}
var __config: Option[ujson.Value] = None
def config: ujson.Value =
__config.getOrElse {
throw new RuntimeException(s"Component $name: has not been initialized. Add a `$slug` field to the configuration file (${Shingle.configFile}")
}
def init(newConfig: ujson.Value): Unit =
{
__config = Some(newConfig)
setup()
}
}
object Component
{
val all = Seq[Component](
Mqtt,
Hass,
Hermes,
MPD,
Weather,
Notifications
)
val byName = all.map { x => x.name -> x }.toMap
def apply(slug: String) =
byName(slug)
}

View File

@ -0,0 +1,173 @@
package net.okennedy.shingle.component
import net.domlom.websocket._
import scala.concurrent.Future
import scala.concurrent.duration._
import upickle.default._
import net.okennedy.shingle.stream.Checkpoint
import scala.collection.concurrent
import scala.util.Failure
import scala.util.Success
object Hass extends Component
{
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
def name = "Home Assistant"
def slug = "hass"
lazy val host = config("host").str
lazy val token = config("token").str
def post(path: String, body: ujson.Value): Unit =
{
println(s"POST: $host/api/$path")
println(body.render())
requests.post(
s"$host/api/$path",
data = body.render(),
headers = Map(
"Authorization" -> s"Bearer $token",
"Content-Type" -> "application/json"
)
)
}
def set(entity: String, state: String, attributes: (String, ujson.Value)*): Unit =
{
post(
s"states/$entity",
ujson.Obj(
"state" -> state,
"attributes" -> (
if(attributes.isEmpty) { ujson.Obj() }
else { ujson.Obj(attributes.head, attributes.tail:_*) }
)
)
)
}
def service(domain: String, service: String, data: (String, ujson.Value)*): Unit =
{
post(
s"services/$domain/$service",
(
if(data.isEmpty) { ujson.Obj() }
else { ujson.Obj(data.head, data.tail:_*) }
)
)
}
def apply(entity: String): Checkpoint[String] =
{
socket // trigger the socket connection if needed
entities.getOrElseUpdate(entity, new Checkpoint[String]("unknown"))
}
private val entities = concurrent.TrieMap[String, Checkpoint[String]]()
private var commandId: Long = 0
lazy val socket =
{
var url = s"$host/api/websocket"
if(url.startsWith("http")){
url = "ws"+url.drop(4)
} else if(!url.startsWith("wss")) {
url = "ws://"+url
}
println(s"Hass websocket ($url) initializing")
val socket = Websocket(url,
WebsocketBehavior(
onOpen = { _ => println("Hass websocket open") },
onMessage = { (_, message) =>
try {
val msg = ujson.read(message.value)
// println(msg)
msg("type").str match {
case "auth_required" => login()
case "auth_ok" => subscribe()
case "auth_invalid" =>
println("Hass login invalid. Disabling websocket.")
close()
case "event" =>
val event = msg("event")
event("event_type").str match {
case "state_changed" =>
val entity = event("data")("entity_id").str
val state = event("data")("new_state")("state")
// println(s"Hass: $entity <- $state")
apply(entity).receive(state.str)
}
case "result" =>
() // ignore me for now
case _ =>
println(s"Hass sent a message of an unknown type: $msg")
}
} catch {
case t: Throwable =>
println("Error while processing a Hass message:")
t.printStackTrace()
}
},
onClose = { reason =>
println("Hass websocket disconnected. \n$reason\n ... Attempting reconnect in 10s")
reconnect()
},
onError = { (_, err) =>
println(err.getMessage)
}
),
debugMode = false
)
socket.connect() match {
case Success(msg) => println(s"Hass: $msg")
case Failure(err) => println(s"Error connecting Hass Websocket ${err.getMessage()}");
}
/* return */ socket
}
private def reconnect(delay: Duration = 10.seconds): Unit =
{
Future {
Thread.sleep(delay.toMillis)
socket.connect()
}
}
private def login(): Unit =
{
socket.sendSync(
ujson.Obj(
"type" -> "auth",
"access_token" -> token
).toString
)
}
private def close(): Unit =
socket.close()
private def sendCommand(args: (String, ujson.Value)*): Unit =
{
commandId += 1
socket.sendSync(
ujson.Obj(
"id" -> commandId,
args:_*
).toString
)
}
private def subscribe(): Unit =
{
println("Hass websocket subscribing to events")
sendCommand(
"type" -> ujson.Str("subscribe_events"),
"event_type" -> ujson.Str("state_changed")
)
}
private case class Login(api_password: String, `type`:String = "auth")
}

View File

@ -1,13 +1,17 @@
package net.okennedy.shingle
package net.okennedy.shingle.component
import upickle.default._
import net.okennedy.shingle.stream.Owner
object Hermes
object Hermes extends Component
{
def name = "Hermes Voice Assistant Integration"
def slug = "hermes"
override def depends = Set(Mqtt.slug)
lazy val defaults =
Shingle.config("hermes")("default")
config("default")
lazy val defaultSiteId =
defaults("siteId").str

View File

@ -0,0 +1,161 @@
package net.okennedy.shingle.component
import org.bff.javampd.file.MPDFile
import org.bff.javampd.server.{ MPD => MPDServer }
import collection.JavaConverters.collectionAsScalaIterableConverter
import org.bff.javampd.song.MPDSong
import org.bff.javampd.output.OutputChangeListener
import org.bff.javampd.output.OutputChangeEvent
import org.bff.javampd.player.PlayerChangeListener
import org.bff.javampd.player.PlayerChangeEvent
import org.bff.javampd.player.PlayerBasicChangeListener
import org.bff.javampd.player.PlayerBasicChangeEvent
import org.bff.javampd.playlist.PlaylistBasicChangeListener
import org.bff.javampd.playlist.PlaylistBasicChangeEvent
import org.bff.javampd.server.ConnectionChangeListener
import org.bff.javampd.server.ConnectionChangeEvent
import org.bff.javampd.player.TrackPositionChangeListener
import org.bff.javampd.player.TrackPositionChangeEvent
import net.okennedy.shingle.stream.Checkpoint
import net.okennedy.shingle.stream.Owner
import upickle.default._
object MPD
extends Component
{
def name = "Music Player Daemon"
def slug = "mpd"
override def depends = Set(Mqtt.slug)
lazy val instance =
MPDServer.Builder()
.server(config("host").str)
.build()
lazy val topic =
config("topic").str
lazy val db =
instance.getMusicDatabase()
def apply(path: String): Option[MPDFile] =
{
var curr: Option[MPDFile] = None
var dir: Iterable[MPDFile] = Seq.empty
for(i <- path.split("/")){
if(i != ""){
dir = curr match {
case None => db.getFileDatabase.listRootDirectory.asScala
case Some(f) => {
if(!f.isDirectory){ return None }
db.getFileDatabase.listDirectory(f).asScala
}
}
curr = dir.find { _.getPath.split("/").last == i }
if(curr.isEmpty){ return None }
}
}
return curr
}
def savedPlaylists: Seq[String] =
db.getPlaylistDatabase.listPlaylists.asScala.toSeq
def savedPlaylist(name: String): Seq[MPDSong] =
db.getPlaylistDatabase.listPlaylistSongs(name).asScala.toSeq
def list(path: String): Seq[MPDFile] =
apply(path).toSeq.flatMap { db.getFileDatabase.listDirectory(_).asScala.toSeq }
def append(path: String): Boolean =
{
val file = apply(path).getOrElse { return false }
instance.getPlaylist.addSong(file.getPath)
return true
}
def play(): Unit =
instance.getPlayer.play()
def clear(): Unit =
instance.getPlaylist.clearPlaylist()
case class Song(title: String, artist: String, album: String, position: Int)
object Song
{
def apply(song: MPDSong): Song =
Song(
title = song.getTitle,
artist = song.getArtistName,
album = song.getAlbumName,
position = song.getPosition
)
implicit val rw: ReadWriter[Song] = macroRW[Song]
}
val status = new Checkpoint[String]("paused")
val playlist = new Checkpoint[Seq[Song]](Seq.empty)
val currentTrack = new Checkpoint[Option[Song]](None)
val position = new Checkpoint[Long](0)
override def setup(): Unit =
{
val bridge = config.obj.get("bridge").map { _.bool }.getOrElse(false)
val useProxy = config.obj.get("use_proxy").map { _.bool }.getOrElse(false)
implicit val owner: Owner = Owner.global
if(bridge && !useProxy)
{
Mqtt(s"$topic/status") << status
Mqtt(s"$topic/playlist") << playlist
Mqtt(s"$topic/now_playing") << currentTrack
Mqtt(s"$topic/position") << position
}
if(useProxy)
{
status << Mqtt(s"$topic/status").asString
playlist << Mqtt(s"$topic/playlist").asJson[Seq[Song]]
currentTrack << Mqtt(s"$topic/now_playing").asJson[Option[Song]]
position << Mqtt(s"$topic/position").asLong
} else
{
val monitor = instance.getMonitor
monitor.addPlayerChangeListener(new PlayerBasicChangeListener(){
def playerBasicChange(evt: PlayerBasicChangeEvent) =
status << (
evt.getStatus match {
case PlayerBasicChangeEvent.Status.PLAYER_PAUSED => "pause"
case PlayerBasicChangeEvent.Status.PLAYER_STARTED => "play"
case PlayerBasicChangeEvent.Status.PLAYER_STOPPED => "pause"
case PlayerBasicChangeEvent.Status.PLAYER_UNPAUSED => "play"
}
)
})
monitor.addPlaylistChangeListener(new PlaylistBasicChangeListener(){
def playlistBasicChange(evt: PlaylistBasicChangeEvent) =
evt.getEvent match {
case PlaylistBasicChangeEvent.Event.PLAYLIST_CHANGED
| PlaylistBasicChangeEvent.Event.PLAYLIST_ENDED =>
playlist << (
instance.getPlaylist.getSongList.asScala.toSeq.map {
Song(_)
}
)
currentTrack << None
case PlaylistBasicChangeEvent.Event.SONG_CHANGED =>
currentTrack << (
Option(instance.getPlaylist.getCurrentSong)
.map { Song(_) }
)
case _ => ()
}
})
monitor.addTrackPositionChangeListener(new TrackPositionChangeListener(){
def trackPositionChanged(evt: TrackPositionChangeEvent) =
position << evt.getElapsedTime
})
monitor.start()
}
}
}

View File

@ -1,4 +1,4 @@
package net.okennedy.shingle
package net.okennedy.shingle.component
import org.fusesource.mqtt.{ client => fuse }
import scala.collection.mutable
@ -8,13 +8,16 @@ import java.util.concurrent.TimeUnit
import org.fusesource.mqtt.client.Tracer
import org.fusesource.mqtt.codec.MQTTFrame
import com.typesafe.scalalogging.LazyLogging
import net.okennedy.shingle.Shingle
object Mqtt
extends Thread
with Component
with LazyLogging
{
lazy val config = Shingle.config("mqtt")
def host = config("host").str
def name = "MQTT Bus"
def slug = "mqtt"
lazy val host = config("host").str
lazy val mqtt = {
val mqtt = new fuse.MQTT()
@ -154,6 +157,8 @@ class MqttPath(val fullPath: String, parent: Option[MqttPath])
<<(value.toString.getBytes)
def <<[T: Writer](value: T): Unit =
<<(writeJs(value))
def <<[T: Writer](stream: Stream[T])(implicit owner: Owner): Unit =
stream.trigger { this << _ }
def apply(pathStep: String): MqttPath =
contents.getOrElseUpdate(

View File

@ -1,12 +1,14 @@
package net.okennedy.shingle.module
package net.okennedy.shingle.component
import net.okennedy.shingle.Shingle
import net.okennedy.shingle.Mqtt
import net.okennedy.shingle.stream.{Stream, Owner}
import net.okennedy.shingle.stream.{Stream, Sink, Owner}
object Notifications
object Notifications extends Component
{
lazy val config = Shingle.config("notification").obj
def name = "Notification System"
def slug = "notification"
override val depends = Set(Mqtt.slug)
lazy val topic =
config("topic").str
lazy val root =
@ -29,23 +31,26 @@ object Notifications
def label = "info"
}
class Heading(id: String)
class Heading(id: String)(implicit owner: Owner) extends Sink[Option[Notification]]
{
val target = root(id).isState
val last: Option[Notification] = null
def <<(msg: Stream[Option[Notification]])(implicit owner: Owner) =
def receive(n: Option[Notification]) =
{
msg.onAnyChangeIncludingFirst
.trigger {
case None =>
target << ""
case Some( notification ) =>
target << notification.json
}
if(last == null || n != last)
{
n match {
case None =>
target << ""
case Some( notification ) =>
target << notification.json
}
}
}
}
def apply(id: String) = new Heading(id)
def apply(id: String)(implicit owner: Owner) = new Heading(id)
}

View File

@ -0,0 +1,45 @@
package net.okennedy.shingle.component
import net.okennedy.shingle.Shingle
import org.fusesource.mqtt.client.QoS
import scala.util.Try
import scala.util.Failure
import scala.util.Success
import net.okennedy.shingle.stream.Owner
import net.okennedy.shingle.cron.IntervalCronEvent
import net.okennedy.shingle.cron.Cron
import scala.concurrent.duration._
import requests.RequestFailedException
import net.okennedy.shingle.stream._
case class RestAPI(
val url: String,
val headers: Iterable[(String, String)] = Seq.empty,
)
{
def get: Try[String] =
{
println(s"Fetching $url")
try {
val result = requests.get(
url = url,
headers = headers,
)
if(result.statusCode == 200){ Success(result.string()) }
else { Failure(new Exception(s"Result ${result.statusCode} when fetching $url")) }
} catch {
case e: RequestFailedException => Failure(e)
}
}
def poll(interval: FiniteDuration = 1.hour)(implicit owner: Owner): StringStream =
Timer.every(interval)
.flatMap { _ => print("Poll!"); get match {
case Success(r) =>
Some(r)
case Failure(err) =>
println(s"Error fetching $url: $err");
None
} }
}

View File

@ -1,4 +1,4 @@
package net.okennedy.shingle
package net.okennedy.shingle.component
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

View File

@ -0,0 +1,57 @@
package net.okennedy.shingle.component
import upickle.default._
import net.okennedy.shingle.cron.IntervalCronEvent
import java.time.Duration
import net.okennedy.shingle.cron.Cron
import net.okennedy.shingle.stream._
import scala.util.Success
import scala.util.Failure
import scala.concurrent.duration._
object Weather extends Component
{
def name = "Weather.gov Integration"
def slug = "weather"
lazy val api =
RestAPI(url = config("url").str)
lazy val state: Checkpoint[ujson.Value] =
{
val state =
api.poll(1.hour)(Owner.global)
.asJson[ujson.Value]
.asState(Owner.global)
try {
state.receive(ujson.read(api.get.get))
} catch {
case t: Throwable =>
println(s"Error initializing Weather. Will try again in 1 hour.\n${t.getMessage}")
}
state
}
def forecast =
ujson.transform(
state.now("properties")("periods"),
reader[Seq[WeatherPrediction]]
)
}
case class WeatherPrediction(
name: String,
isDaytime: Boolean,
temperature: Double,
temperatureUnit: String,
windSpeed: String,
windDirection: String,
icon: String,
shortForecast: String,
detailedForecast: String
)
object WeatherPrediction
{
implicit val rw: ReadWriter[WeatherPrediction] = macroRW[WeatherPrediction]
}

View File

@ -85,9 +85,8 @@ object Module
engine.eval(
s"""net.okennedy.shingle.module.Module("$module") { implicit ctx =>
| import net.okennedy.shingle._;
| import net.okennedy.shingle.component._;
| import net.okennedy.shingle.cron.Cron;
| import net.okennedy.shingle.module.Notification;
| import net.okennedy.shingle.module.Notifications;
| import scala.concurrent.duration._;
| $script
|}

View File

@ -0,0 +1,16 @@
package net.okennedy.shingle.stream
trait Sink[T]
{
def receive(value: T): Unit
def <<(source: Stream[T])(implicit owner: Owner): Sink[T] =
{
source.trigger { receive(_) }; this
}
def <<(value: T): Sink[T] =
{
receive(value); this
}
}

View File

@ -0,0 +1,24 @@
package net.okennedy.shingle.stream
trait State[T]
{
def now: T
}
class Checkpoint[T](init: Option[T] = None) extends State[T] with Sink[T] with Dispatchable[T]
{
private var __value: Option[T] = init
def this(init: T) = this(Some(init))
def now:T =
__value.getOrElse {
throw new RuntimeException("Checkpoint hasn't received a value yet")
}
def receive(incoming: T): Unit =
{
__value = Some(incoming)
dispatch(incoming)
}
}

View File

@ -4,9 +4,6 @@ import scala.collection.mutable
import upickle.default._
import ujson.Bool
import java.util.Arrays
import net.okennedy.shingle.MqttPath
import upickle.default._
import net.okennedy.shingle.Mqtt
trait Stream[T]
{
@ -24,6 +21,19 @@ trait Stream[T]
}
}
def flatMap[T2](f: T => Iterable[T2]): Stream[T2] =
{
def baseTrigger(handler: T => Unit, owner: Owner) = trigger(handler)(owner)
new Stream[T2] {
def trigger(handler: T2 => Unit)(implicit owner: Owner): Unit =
{
baseTrigger({
value => f(value).foreach { handler(_) }
}, owner)
}
}
}
def filter(f: T => Boolean): Stream[T] =
{
def baseTrigger(handler: T => Unit, owner: Owner) = trigger(handler)(owner)
@ -62,23 +72,16 @@ trait Stream[T]
def onAnyChangeIncludingFirst: TriggerableOnChange[T] =
new TriggerableOnChange(this, _ equals _, false) { }
def aliasAsJson(label: String)(implicit owner: Owner, writes: Writer[T]): MqttPath =
{
val ret = Mqtt(label)
trigger { ret << _ }
ret
}
def uniq = onAnyChange
def aliasAsString(label: String)(implicit owner: Owner): MqttPath =
def asState(implicit owner: Owner): Checkpoint[T] =
{
val ret = Mqtt(label)
trigger { ret << _.toString }
ret
val ret = new Checkpoint[T]()
ret << this
return ret
}
}
trait StreamMap[T1,T2] extends Stream[T2]
trait Dispatchable[T] extends Stream[T]
{
private val triggers = mutable.HashMap[String, Trigger[T]]()
@ -131,6 +134,7 @@ trait ByteArrayStream extends Stream[Array[Byte]]
{
def asString = map { new String(_) }
def asInt = map { new String(_).toInt }
def asLong = map { new String(_).toLong }
def asJson[T](implicit reader: Reader[T]) = map { ujson.transform(_, reader) }
override def onAnyChange: ByteArrayStream with TriggerableOnChange[Array[Byte]] =
@ -139,5 +143,28 @@ trait ByteArrayStream extends Stream[Array[Byte]]
override def onAnyChangeIncludingFirst: ByteArrayStream with TriggerableOnChange[Array[Byte]] =
new ByteArrayStream with TriggerableOnChange[Array[Byte]](this, Arrays.equals(_, _), false)
}
implicit class ConvertedByteArrayStream(source: Stream[Array[Byte]]) extends ByteArrayStream
{
def trigger(handler: Array[Byte] => Unit)(implicit owner: Owner): Unit =
source.trigger(handler)(owner)
}
trait StringStream extends Stream[String]
{
def asInt = map { _.toInt }
def asLong = map { _.toLong }
def asJson[T](implicit reader: Reader[T]) = map { ujson.transform(_, reader) }
override def onAnyChange: StringStream with TriggerableOnChange[String] =
new StringStream with TriggerableOnChange[String](this, _ == _)
override def onAnyChangeIncludingFirst: StringStream with TriggerableOnChange[String] =
new StringStream with TriggerableOnChange[String](this, _ == _, false)
}
implicit class ConvertedStringStream(source: Stream[String]) extends StringStream
{
def trigger(handler: String => Unit)(implicit owner: Owner): Unit =
source.trigger(handler)(owner)
}
class Trigger[T](val handler: T => Unit, val owner: Owner)