diff --git a/Dockerfile b/Dockerfile index 165a3cc1..e4a3be33 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,7 +62,7 @@ RUN /dist/agent/main version ############################################### # Build Bento4 -> we want fragmented mp4 files -ENV BENTO4_VERSION 1.6.0-641 +ENV BENTO4_VERSION=1.6.0-641 RUN cd /tmp && git clone https://github.com/axiomatic-systems/Bento4 && cd Bento4 && \ git checkout tags/v${BENTO4_VERSION} && \ cd Build && \ diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index d6ba5b34..a8cadc22 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -28,8 +28,12 @@ import ( "github.com/kerberos-io/agent/machinery/src/packets" "github.com/kerberos-io/agent/machinery/src/utils" "github.com/kerberos-io/agent/machinery/src/webrtc" + xsd "github.com/kerberos-io/onvif/xsd" ) +var topicKinds = xsd.String("tns1:Device/Trigger//.") // -> This works for Avigilon, Hanwa, Hikvision +// var topicKinds = xsd.String("//.") // -> This works for Axis, but throws other errors. + func PendingUpload(configDirectory string) { ff, err := utils.ReadDirectory(configDirectory + "/data/cloud/") if err == nil { @@ -239,7 +243,7 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models. cameraConfiguration := configuration.Config.Capture.IPCamera device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration) if err != nil { - pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device) + pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device, topicKinds) if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } @@ -306,7 +310,7 @@ loop: // - In this scenario we are creating a new subscription to retrieve the initial (current) state of the inputs and outputs. // Get a new pull point address, to get the initiatal state of the inputs and outputs. - pullPointAddressInitialState, err := onvif.CreatePullPointSubscription(device) + pullPointAddressInitialState, err := onvif.CreatePullPointSubscription(device, topicKinds) if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } @@ -344,7 +348,7 @@ loop: } else if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while getting events: " + err.Error()) onvifEventsList = []byte("[]") - pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device) + pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device, topicKinds) if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } @@ -354,7 +358,7 @@ loop: } } else { log.Log.Debug("cloud.HandleHeartBeat(): no pull point address found.") - pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device) + pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device, topicKinds) if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } diff --git a/machinery/src/computervision/main.go b/machinery/src/computervision/main.go index ca70ac91..1bd8643d 100644 --- a/machinery/src/computervision/main.go +++ b/machinery/src/computervision/main.go @@ -11,6 +11,7 @@ import ( "github.com/kerberos-io/agent/machinery/src/log" "github.com/kerberos-io/agent/machinery/src/models" "github.com/kerberos-io/agent/machinery/src/packets" + "github.com/kerberos-io/agent/machinery/src/onvif" ) func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, rtspClient capture.RTSPClient) { @@ -32,6 +33,16 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf log.Log.Info("computervision.main.ProcessMotion(): you've enabled continuous recording, so no motion detection required.") + } else if config.Capture.MotionONVIF == "true" { + + log.Log.Info("computervision.main.ProcessMotion(): ONVIF motion detected is enabled, so creating subscription to ONVIF motion events.") + + err := onvif.ProcessONVIFMotion(configuration, communication) + + if err != nil { + log.Log.Error("computervision.main.ProcessMotion(): ONVIF motion detected failed: " + err.Error()) + } + } else { log.Log.Info("computervision.main.ProcessMotion(): motion detected is enabled, so starting the motion detection.") diff --git a/machinery/src/config/main.go b/machinery/src/config/main.go index 2bc0a62c..7d679f97 100644 --- a/machinery/src/config/main.go +++ b/machinery/src/config/main.go @@ -266,6 +266,9 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) { case "AGENT_CAPTURE_MOTION": configuration.Config.Capture.Motion = value break + case "AGENT_CAPTURE_MOTION_ONVIF": + configuration.Config.Capture.MotionONVIF = value + break case "AGENT_CAPTURE_SNAPSHOTS": configuration.Config.Capture.Snapshots = value break diff --git a/machinery/src/models/Communication.go b/machinery/src/models/Communication.go index 262a86f8..de369ce8 100644 --- a/machinery/src/models/Communication.go +++ b/machinery/src/models/Communication.go @@ -25,6 +25,7 @@ type Communication struct { HandleAudio chan AudioDataPartial HandleUpload chan string HandleHeartBeat chan string + ProcessONVIFMotion chan string HandleLiveSD chan int64 HandleLiveHDKeepalive chan string HandleLiveHDHandshake chan RequestHDStreamPayload diff --git a/machinery/src/models/Config.go b/machinery/src/models/Config.go index a6bbadf8..e31c0c54 100644 --- a/machinery/src/models/Config.go +++ b/machinery/src/models/Config.go @@ -60,6 +60,7 @@ type Capture struct { Recording string `json:"recording,omitempty"` Snapshots string `json:"snapshots,omitempty"` Motion string `json:"motion,omitempty"` + MotionONVIF string `json:"motiononvif,omitempty"` Liveview string `json:"liveview,omitempty"` Continuous string `json:"continuous,omitempty"` PostRecording int64 `json:"postrecording"` diff --git a/machinery/src/onvif/main.go b/machinery/src/onvif/main.go index 45623d77..9a78c8ac 100644 --- a/machinery/src/onvif/main.go +++ b/machinery/src/onvif/main.go @@ -973,7 +973,7 @@ type ONVIFEvents struct { } // Create PullPointSubscription -func CreatePullPointSubscription(dev *onvif.Device) (string, error) { +func CreatePullPointSubscription(dev *onvif.Device, topicKinds xsd.String) (string, error) { // We'll create a subscription to the device // This will allow us to receive events from the device @@ -994,8 +994,7 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) { Filter: &event.FilterType{ TopicExpression: &event.TopicExpressionType{ Dialect: xsd.String("http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet"), - TopicKinds: "tns1:Device/Trigger//.", // -> This works for Avigilon, Hanwa, Hikvision - // TopicKinds: "//.", -> This works for Axis, but throws other errors. + TopicKinds: topicKinds, }, }, }) @@ -1201,6 +1200,124 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents return eventsArray, err } +func ProcessONVIFMotion(configuration *models.Configuration, communication *models.Communication) (error){ + cameraConfiguration := configuration.Config.Capture.IPCamera + topicKinds := xsd.String("tns1:RuleEngine/CellMotionDetector/Motion") + var pullPointAddress string + var device *onvif.Device + + defer func() { + if pullPointAddress != "" && device != nil { + log.Log.Debug("onvif.main.ProcessONVIFMotion(): Unsubscribing from pullPointAddress") + UnsubscribePullPoint(device, pullPointAddress) + } + }() + + for { + select { + case <-communication.ProcessONVIFMotion: + log.Log.Debug("onvif.main.ProcessONVIFMotion(): Stopping motion processing due to reconfiguration") + return nil + case <-time.After(10 * time.Second): + } + + if cameraConfiguration.ONVIFXAddr != "" { + device, _, err := ConnectToOnvifDevice(&cameraConfiguration) + if err != nil { + log.Log.Error("onvif.main.ProcessONVIFMotion(): error while connecting to ONVIF device: " + err.Error()) + continue + } + + if pullPointAddress != "" { + log.Log.Debug("onvif.main.ProcessONVIFMotion(): Fetching events from pullPointAddress") + notifications := GetNotificationMessage(device, pullPointAddress) + log.Log.Debug("onvif.main.ProcessONVIFMotion(): Completed fetching events from pullPointAddress") + if notifications == nil { + log.Log.Error("onvif.main.ProcessONVIFMotion(): Error while getting events. Notifications were nil.") + continue + } + for _, msg := range notifications { + for _, item := range msg.Message.Message.Data.SimpleItem { + if item.Name == "IsMotion" && item.Value == "true" { + log.Log.Debug("onvif.main.ProcessONVIFMotion(): Motion detected!") + dataToPass := models.MotionDataPartial{ + Timestamp: time.Now().Unix(), + NumberOfChanges: 1000, + } + communication.HandleMotion <- dataToPass + } else if item.Name == "IsMotion" && item.Value == "false" { + log.Log.Debug("onvif.main.ProcessONVIFMotion(): Motion no longer detected!") + } + } + } + } else { + log.Log.Debug("onvif.main.ProcessONVIFMotion(): no pull point address found. Creating subscription.") + pullPointAddress, err = CreatePullPointSubscription(device, topicKinds) + if err != nil { + log.Log.Error("onvif.main.ProcessONVIFMotion(): error while creating pull point subscription: " + err.Error()) + } + } + } else { + log.Log.Debug("onvif.main.ProcessONVIFMotion(): ONVIF is not enabled. Sleeping for 60 seconds.") + time.Sleep(60 * time.Second) + } + } +} + + +func GetNotificationMessage(dev *onvif.Device, pullPointAddress string) ([]event.NotificationMessage) { + type Body struct { + PullMessagesResponse event.PullMessagesResponse `xml:"PullMessagesResponse"` + } + + type Envelope struct { + XMLName xml.Name `xml:"Envelope"` + Body Body `xml:"Body"` + } + + pullMessage := event.PullMessages{ + Timeout: xsd.Duration("PT5S"), + MessageLimit: 10, + } + + if pullPointAddress == "" { + log.Log.Error("onvif.main.GetNotificationMessage(): empty pullPointAddress") + return nil + } + + requestBody, err := xml.Marshal(pullMessage) + if err != nil { + log.Log.Error("onvif.main.GetNotificationMessage(): " + err.Error()) + return nil + } + + res, err := dev.SendSoap(string(pullPointAddress), string(requestBody)) + if err != nil { + log.Log.Error("onvif.main.GetNotificationMessage(): " + err.Error()) + return nil + } + if res.Body == nil { + log.Log.Error("onvif.main.GetNotificationMessage(): empty response Body") + return nil + } + + defer res.Body.Close() + data, err := io.ReadAll(res.Body) + if err != nil { + log.Log.Error("onvif.main.GetNotificationMessage(): Failed to read response body: " + err.Error()) + return nil + } + + var envelope Envelope + if err := xml.Unmarshal(data, &envelope); err != nil { + log.Log.Error("onvif.main.GetNotificationMessage(): Failed to parse PullMessages XML: " + err.Error()) + return nil; + } + log.Log.Debug("onvif.main.GetNotificationMessage(): Envelope unmarshalled") + + return envelope.Body.PullMessagesResponse.NotificationMessage +} + // This method will get the digital inputs from the device. // But will not give any status information. func GetDigitalInputs(dev *onvif.Device) (device.GetDigitalInputsResponse, error) {