Refactor Triggerable -> Stream
parent
8b1f959084
commit
2ca33e0ef8
|
@ -1,6 +1,7 @@
|
|||
package net.okennedy.shingle
|
||||
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
|
||||
object Hermes
|
||||
{
|
||||
|
@ -11,7 +12,7 @@ object Hermes
|
|||
lazy val defaultSiteId =
|
||||
defaults("siteId").str
|
||||
|
||||
def registerIntent(id: String)(handler: Map[String, ujson.Value] => Unit)(implicit owner: module.Owner) =
|
||||
def registerIntent(id: String)(handler: Map[String, ujson.Value] => Unit)(implicit owner: Owner) =
|
||||
{
|
||||
Mqtt(s"hermes/intent/$id").asJson[ujson.Value].trigger { payload =>
|
||||
println(s"Hermes Intent: $id")
|
||||
|
|
|
@ -2,7 +2,7 @@ package net.okennedy.shingle
|
|||
|
||||
import org.fusesource.mqtt.{ client => fuse }
|
||||
import scala.collection.mutable
|
||||
import net.okennedy.shingle.module._
|
||||
import net.okennedy.shingle.stream._
|
||||
import upickle.default._
|
||||
import java.util.concurrent.TimeUnit
|
||||
import org.fusesource.mqtt.client.Tracer
|
||||
|
@ -63,7 +63,7 @@ object Mqtt
|
|||
}
|
||||
|
||||
class MqttPath(val fullPath: String, parent: Option[MqttPath])
|
||||
extends TriggerableByteArray
|
||||
extends ByteArrayStream
|
||||
with Dispatchable[Array[Byte]]
|
||||
with LazyLogging
|
||||
{
|
||||
|
|
|
@ -6,7 +6,7 @@ import org.fusesource.mqtt.client.QoS
|
|||
import scala.util.Try
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import net.okennedy.shingle.module.Owner
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
import net.okennedy.shingle.cron.IntervalCronEvent
|
||||
import net.okennedy.shingle.cron.Cron
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import java.time.LocalDateTime
|
|||
import java.time.temporal.ChronoUnit
|
||||
import org.fusesource.mqtt.client.QoS
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.module.Triggerable
|
||||
import net.okennedy.shingle.stream.Stream
|
||||
|
||||
class Timer(
|
||||
topic: MqttPath,
|
||||
|
@ -22,7 +22,7 @@ class Timer(
|
|||
def go(
|
||||
end: LocalDateTime,
|
||||
start: LocalDateTime = LocalDateTime.now
|
||||
): Triggerable[TimerStatus] =
|
||||
): Stream[TimerStatus] =
|
||||
{
|
||||
aborted = false
|
||||
this.endTime = end
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package net.okennedy.shingle
|
||||
|
||||
import net.okennedy.shingle.module.Owner
|
||||
import upickle.default._
|
||||
import net.okennedy.shingle.cron.IntervalCronEvent
|
||||
import java.time.Duration
|
||||
import net.okennedy.shingle.cron.Cron
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
import scala.util.Success
|
||||
import scala.util.Failure
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import math.Ordered.orderingToOrdered
|
|||
import java.time.Clock
|
||||
import java.time.temporal.TemporalUnit
|
||||
import java.time.temporal.ChronoUnit
|
||||
import net.okennedy.shingle.module.Owner
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
|
||||
object Cron
|
||||
{
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
package net.okennedy.shingle.cron
|
||||
|
||||
import net.okennedy.shingle.module.Triggerable
|
||||
import net.okennedy.shingle.module.Owner
|
||||
import net.okennedy.shingle.module.Dispatchable
|
||||
import net.okennedy.shingle.stream.Owner
|
||||
import net.okennedy.shingle.stream.Dispatchable
|
||||
import net.okennedy.shingle.stream.Stream
|
||||
|
||||
object TimeRangeTriggerable
|
||||
{
|
||||
def cron(
|
||||
start: String,
|
||||
end: String
|
||||
)(implicit owner: Owner): Triggerable[Boolean] =
|
||||
)(implicit owner: Owner): Stream[Boolean] =
|
||||
{
|
||||
val stream = new Dispatchable[Boolean]() {}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import dotty.tools.repl.ScriptEngine
|
|||
import java.io.File
|
||||
import scala.io.Source
|
||||
import java.io.FileWriter
|
||||
import net.okennedy.shingle.stream._
|
||||
|
||||
object Module
|
||||
{
|
||||
|
@ -84,10 +85,10 @@ object Module
|
|||
engine.eval(
|
||||
s"""net.okennedy.shingle.module.Module("$module") { implicit ctx =>
|
||||
| import net.okennedy.shingle._;
|
||||
| import net.okennedy.shingle.cron.Cron
|
||||
| import net.okennedy.shingle.module.Notification
|
||||
| import net.okennedy.shingle.module.Notifications
|
||||
| import net.okennedy.shingle.cron.TimeRangeTriggerable
|
||||
| import net.okennedy.shingle.cron.Cron;
|
||||
| import net.okennedy.shingle.module.Notification;
|
||||
| import net.okennedy.shingle.module.Notifications;
|
||||
| import net.okennedy.shingle.cron.TimeRangeTriggerable;
|
||||
| $script
|
||||
|}
|
||||
|""".stripMargin,
|
||||
|
|
|
@ -2,6 +2,7 @@ package net.okennedy.shingle.module
|
|||
|
||||
import net.okennedy.shingle.Shingle
|
||||
import net.okennedy.shingle.Mqtt
|
||||
import net.okennedy.shingle.stream.{Stream, Owner}
|
||||
|
||||
object Notifications
|
||||
{
|
||||
|
@ -32,7 +33,7 @@ object Notifications
|
|||
{
|
||||
val target = root(id).isState
|
||||
|
||||
def <<(msg: Triggerable[Option[Notification]])(implicit owner: Owner) =
|
||||
def <<(msg: Stream[Option[Notification]])(implicit owner: Owner) =
|
||||
{
|
||||
msg.onAnyChangeIncludingFirst
|
||||
.trigger {
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package net.okennedy.shingle.module
|
||||
package net.okennedy.shingle.stream
|
||||
|
||||
import java.util.Timer
|
||||
import java.util.TimerTask
|
||||
|
||||
class DebouncingTriggerable[T](
|
||||
class DebouncingStream[T](
|
||||
delayMillis: Long,
|
||||
) extends Dispatchable[T]
|
||||
{
|
||||
|
@ -21,11 +21,11 @@ class DebouncingTriggerable[T](
|
|||
}
|
||||
}
|
||||
timer = Some(task)
|
||||
DebouncingTriggerable.timer.schedule(task, delayMillis)
|
||||
DebouncingStream.timer.schedule(task, delayMillis)
|
||||
}
|
||||
}
|
||||
|
||||
object DebouncingTriggerable
|
||||
object DebouncingStream
|
||||
{
|
||||
val timer = new Timer("Debounce Timer", true);
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
package net.okennedy.shingle.module
|
||||
package net.okennedy.shingle.stream
|
||||
|
||||
class JoinTriggerable[T1,T2](base1:Triggerable[T1], base2:Triggerable[T2])(implicit owner: Owner)
|
||||
class JoinStream[T1,T2](base1:Stream[T1], base2:Stream[T2])(implicit owner: Owner)
|
||||
extends Dispatchable[(T1,T2)]
|
||||
{
|
||||
var last1: Option[T1] = None
|
|
@ -1,4 +1,4 @@
|
|||
package net.okennedy.shingle.module
|
||||
package net.okennedy.shingle.stream
|
||||
|
||||
import scala.collection.mutable
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package net.okennedy.shingle.module
|
||||
package net.okennedy.shingle.stream
|
||||
|
||||
import scala.collection.mutable
|
||||
import upickle.default._
|
||||
|
@ -8,15 +8,15 @@ import net.okennedy.shingle.MqttPath
|
|||
import upickle.default._
|
||||
import net.okennedy.shingle.Mqtt
|
||||
|
||||
trait Triggerable[T]
|
||||
trait Stream[T]
|
||||
{
|
||||
|
||||
def trigger(handler: T => Unit)(implicit owner: Owner): Unit
|
||||
|
||||
def map[T2](f: T => T2): Triggerable[T2] =
|
||||
def map[T2](f: T => T2): Stream[T2] =
|
||||
{
|
||||
def baseTrigger(handler: T => Unit, owner: Owner) = trigger(handler)(owner)
|
||||
new Triggerable[T2] {
|
||||
new Stream[T2] {
|
||||
def trigger(handler: T2 => Unit)(implicit owner: Owner): Unit =
|
||||
{
|
||||
baseTrigger({ value => handler(f(value)) }, owner)
|
||||
|
@ -24,10 +24,10 @@ trait Triggerable[T]
|
|||
}
|
||||
}
|
||||
|
||||
def filter(f: T => Boolean): Triggerable[T] =
|
||||
def filter(f: T => Boolean): Stream[T] =
|
||||
{
|
||||
def baseTrigger(handler: T => Unit, owner: Owner) = trigger(handler)(owner)
|
||||
new Triggerable[T] {
|
||||
new Stream[T] {
|
||||
def trigger(handler: T => Unit)(implicit owner: Owner): Unit =
|
||||
{
|
||||
baseTrigger({ case v if f(v) => handler(v); case _ => () }, owner)
|
||||
|
@ -35,10 +35,10 @@ trait Triggerable[T]
|
|||
}
|
||||
}
|
||||
|
||||
def filterOption(f: T => Boolean): Triggerable[Option[T]] =
|
||||
def filterOption(f: T => Boolean): Stream[Option[T]] =
|
||||
{
|
||||
def baseTrigger(handler: T => Unit, owner: Owner) = trigger(handler)(owner)
|
||||
new Triggerable[Option[T]] {
|
||||
new Stream[Option[T]] {
|
||||
def trigger(handler: Option[T] => Unit)(implicit owner: Owner): Unit =
|
||||
{
|
||||
baseTrigger({ case v if f(v) => handler(Some(v)); case _ => handler(None) }, owner)
|
||||
|
@ -46,12 +46,12 @@ trait Triggerable[T]
|
|||
}
|
||||
}
|
||||
|
||||
def join[T2](other: Triggerable[T2])(implicit owner: Owner): JoinTriggerable[T, T2] =
|
||||
new JoinTriggerable(this, other)
|
||||
def join[T2](other: Stream[T2])(implicit owner: Owner): JoinStream[T, T2] =
|
||||
new JoinStream(this, other)
|
||||
|
||||
def debounce(delayMillis: Long)(implicit owner: Owner): DebouncingTriggerable[T] =
|
||||
def debounce(delayMillis: Long)(implicit owner: Owner): DebouncingStream[T] =
|
||||
{
|
||||
val ret = new DebouncingTriggerable[T](delayMillis);
|
||||
val ret = new DebouncingStream[T](delayMillis);
|
||||
trigger { x => ret.handle(x) }
|
||||
return ret
|
||||
}
|
||||
|
@ -77,9 +77,9 @@ trait Triggerable[T]
|
|||
}
|
||||
}
|
||||
|
||||
trait TriggerableMap[T1,T2] extends Triggerable[T2]
|
||||
trait StreamMap[T1,T2] extends Stream[T2]
|
||||
|
||||
trait Dispatchable[T] extends Triggerable[T]
|
||||
trait Dispatchable[T] extends Stream[T]
|
||||
{
|
||||
private val triggers = mutable.HashMap[String, Trigger[T]]()
|
||||
|
||||
|
@ -110,10 +110,10 @@ trait Dispatchable[T] extends Triggerable[T]
|
|||
}
|
||||
|
||||
trait TriggerableOnChange[T](
|
||||
base: Triggerable[T],
|
||||
base: Stream[T],
|
||||
cmp: (T, T) => Boolean,
|
||||
skipInitial: Boolean = true,
|
||||
) extends Triggerable[T]
|
||||
) extends Stream[T]
|
||||
{
|
||||
var prev:Option[T] = None
|
||||
|
||||
|
@ -127,17 +127,17 @@ trait TriggerableOnChange[T](
|
|||
}
|
||||
|
||||
}
|
||||
trait TriggerableByteArray extends Triggerable[Array[Byte]]
|
||||
trait ByteArrayStream extends Stream[Array[Byte]]
|
||||
{
|
||||
def asString = map { new String(_) }
|
||||
def asInt = map { new String(_).toInt }
|
||||
def asJson[T](implicit reader: Reader[T]) = map { ujson.transform(_, reader) }
|
||||
|
||||
override def onAnyChange: TriggerableByteArray with TriggerableOnChange[Array[Byte]] =
|
||||
new TriggerableByteArray with TriggerableOnChange[Array[Byte]](this, Arrays.equals(_, _))
|
||||
override def onAnyChange: ByteArrayStream with TriggerableOnChange[Array[Byte]] =
|
||||
new ByteArrayStream with TriggerableOnChange[Array[Byte]](this, Arrays.equals(_, _))
|
||||
|
||||
override def onAnyChangeIncludingFirst: TriggerableByteArray with TriggerableOnChange[Array[Byte]] =
|
||||
new TriggerableByteArray with TriggerableOnChange[Array[Byte]](this, Arrays.equals(_, _), false)
|
||||
override def onAnyChangeIncludingFirst: ByteArrayStream with TriggerableOnChange[Array[Byte]] =
|
||||
new ByteArrayStream with TriggerableOnChange[Array[Byte]](this, Arrays.equals(_, _), false)
|
||||
}
|
||||
|
||||
class Trigger[T](val handler: T => Unit, val owner: Owner)
|
Loading…
Reference in New Issue