Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ RUN /dist/agent/main version
###############################################
# Build Bento4 -> we want fragmented mp4 files

ENV BENTO4_VERSION 1.6.0-641
ENV BENTO4_VERSION=1.6.0-641
RUN cd /tmp && git clone https://github.com/axiomatic-systems/Bento4 && cd Bento4 && \
git checkout tags/v${BENTO4_VERSION} && \
cd Build && \
Expand Down
12 changes: 8 additions & 4 deletions machinery/src/cloud/Cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ import (
"github.com/kerberos-io/agent/machinery/src/packets"
"github.com/kerberos-io/agent/machinery/src/utils"
"github.com/kerberos-io/agent/machinery/src/webrtc"
xsd "github.com/kerberos-io/onvif/xsd"
)

var topicKinds = xsd.String("tns1:Device/Trigger//.") // -> This works for Avigilon, Hanwa, Hikvision
// var topicKinds = xsd.String("//.") // -> This works for Axis, but throws other errors.

func PendingUpload(configDirectory string) {
ff, err := utils.ReadDirectory(configDirectory + "/data/cloud/")
if err == nil {
Expand Down Expand Up @@ -239,7 +243,7 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models.
cameraConfiguration := configuration.Config.Capture.IPCamera
device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration)
if err != nil {
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device)
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device, topicKinds)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
Expand Down Expand Up @@ -306,7 +310,7 @@ loop:
// - In this scenario we are creating a new subscription to retrieve the initial (current) state of the inputs and outputs.

// Get a new pull point address, to get the initiatal state of the inputs and outputs.
pullPointAddressInitialState, err := onvif.CreatePullPointSubscription(device)
pullPointAddressInitialState, err := onvif.CreatePullPointSubscription(device, topicKinds)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
Expand Down Expand Up @@ -344,7 +348,7 @@ loop:
} else if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while getting events: " + err.Error())
onvifEventsList = []byte("[]")
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device)
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device, topicKinds)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
Expand All @@ -354,7 +358,7 @@ loop:
}
} else {
log.Log.Debug("cloud.HandleHeartBeat(): no pull point address found.")
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device)
pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device, topicKinds)
if err != nil {
log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error())
}
Expand Down
11 changes: 11 additions & 0 deletions machinery/src/computervision/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/agent/machinery/src/packets"
"github.com/kerberos-io/agent/machinery/src/onvif"
)

func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, rtspClient capture.RTSPClient) {
Expand All @@ -32,6 +33,16 @@ func ProcessMotion(motionCursor *packets.QueueCursor, configuration *models.Conf

log.Log.Info("computervision.main.ProcessMotion(): you've enabled continuous recording, so no motion detection required.")

} else if config.Capture.MotionONVIF == "true" {

log.Log.Info("computervision.main.ProcessMotion(): ONVIF motion detected is enabled, so creating subscription to ONVIF motion events.")

err := onvif.ProcessONVIFMotion(configuration, communication)

if err != nil {
log.Log.Error("computervision.main.ProcessMotion(): ONVIF motion detected failed: " + err.Error())
}

} else {

log.Log.Info("computervision.main.ProcessMotion(): motion detected is enabled, so starting the motion detection.")
Expand Down
3 changes: 3 additions & 0 deletions machinery/src/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
case "AGENT_CAPTURE_MOTION":
configuration.Config.Capture.Motion = value
break
case "AGENT_CAPTURE_MOTION_ONVIF":
configuration.Config.Capture.MotionONVIF = value
break
case "AGENT_CAPTURE_SNAPSHOTS":
configuration.Config.Capture.Snapshots = value
break
Expand Down
1 change: 1 addition & 0 deletions machinery/src/models/Communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Communication struct {
HandleAudio chan AudioDataPartial
HandleUpload chan string
HandleHeartBeat chan string
ProcessONVIFMotion chan string
HandleLiveSD chan int64
HandleLiveHDKeepalive chan string
HandleLiveHDHandshake chan RequestHDStreamPayload
Expand Down
1 change: 1 addition & 0 deletions machinery/src/models/Config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Capture struct {
Recording string `json:"recording,omitempty"`
Snapshots string `json:"snapshots,omitempty"`
Motion string `json:"motion,omitempty"`
MotionONVIF string `json:"motiononvif,omitempty"`
Liveview string `json:"liveview,omitempty"`
Continuous string `json:"continuous,omitempty"`
PostRecording int64 `json:"postrecording"`
Expand Down
123 changes: 120 additions & 3 deletions machinery/src/onvif/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ type ONVIFEvents struct {
}

// Create PullPointSubscription
func CreatePullPointSubscription(dev *onvif.Device) (string, error) {
func CreatePullPointSubscription(dev *onvif.Device, topicKinds xsd.String) (string, error) {

// We'll create a subscription to the device
// This will allow us to receive events from the device
Expand All @@ -994,8 +994,7 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) {
Filter: &event.FilterType{
TopicExpression: &event.TopicExpressionType{
Dialect: xsd.String("http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet"),
TopicKinds: "tns1:Device/Trigger//.", // -> This works for Avigilon, Hanwa, Hikvision
// TopicKinds: "//.", -> This works for Axis, but throws other errors.
TopicKinds: topicKinds,
},
},
})
Expand Down Expand Up @@ -1201,6 +1200,124 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents
return eventsArray, err
}

func ProcessONVIFMotion(configuration *models.Configuration, communication *models.Communication) (error){
cameraConfiguration := configuration.Config.Capture.IPCamera
topicKinds := xsd.String("tns1:RuleEngine/CellMotionDetector/Motion")
var pullPointAddress string
var device *onvif.Device

defer func() {
if pullPointAddress != "" && device != nil {
log.Log.Debug("onvif.main.ProcessONVIFMotion(): Unsubscribing from pullPointAddress")
UnsubscribePullPoint(device, pullPointAddress)
}
}()

for {
select {
case <-communication.ProcessONVIFMotion:
log.Log.Debug("onvif.main.ProcessONVIFMotion(): Stopping motion processing due to reconfiguration")
return nil
case <-time.After(10 * time.Second):
}

if cameraConfiguration.ONVIFXAddr != "" {
device, _, err := ConnectToOnvifDevice(&cameraConfiguration)
if err != nil {
log.Log.Error("onvif.main.ProcessONVIFMotion(): error while connecting to ONVIF device: " + err.Error())
continue
}

if pullPointAddress != "" {
log.Log.Debug("onvif.main.ProcessONVIFMotion(): Fetching events from pullPointAddress")
notifications := GetNotificationMessage(device, pullPointAddress)
log.Log.Debug("onvif.main.ProcessONVIFMotion(): Completed fetching events from pullPointAddress")
if notifications == nil {
log.Log.Error("onvif.main.ProcessONVIFMotion(): Error while getting events. Notifications were nil.")
continue
}
for _, msg := range notifications {
for _, item := range msg.Message.Message.Data.SimpleItem {
if item.Name == "IsMotion" && item.Value == "true" {
log.Log.Debug("onvif.main.ProcessONVIFMotion(): Motion detected!")
dataToPass := models.MotionDataPartial{
Timestamp: time.Now().Unix(),
NumberOfChanges: 1000,
}
communication.HandleMotion <- dataToPass
} else if item.Name == "IsMotion" && item.Value == "false" {
log.Log.Debug("onvif.main.ProcessONVIFMotion(): Motion no longer detected!")
}
}
}
} else {
log.Log.Debug("onvif.main.ProcessONVIFMotion(): no pull point address found. Creating subscription.")
pullPointAddress, err = CreatePullPointSubscription(device, topicKinds)
if err != nil {
log.Log.Error("onvif.main.ProcessONVIFMotion(): error while creating pull point subscription: " + err.Error())
}
}
} else {
log.Log.Debug("onvif.main.ProcessONVIFMotion(): ONVIF is not enabled. Sleeping for 60 seconds.")
time.Sleep(60 * time.Second)
}
}
}


func GetNotificationMessage(dev *onvif.Device, pullPointAddress string) ([]event.NotificationMessage) {
type Body struct {
PullMessagesResponse event.PullMessagesResponse `xml:"PullMessagesResponse"`
}

type Envelope struct {
XMLName xml.Name `xml:"Envelope"`
Body Body `xml:"Body"`
}

pullMessage := event.PullMessages{
Timeout: xsd.Duration("PT5S"),
MessageLimit: 10,
}

if pullPointAddress == "" {
log.Log.Error("onvif.main.GetNotificationMessage(): empty pullPointAddress")
return nil
}

requestBody, err := xml.Marshal(pullMessage)
if err != nil {
log.Log.Error("onvif.main.GetNotificationMessage(): " + err.Error())
return nil
}

res, err := dev.SendSoap(string(pullPointAddress), string(requestBody))
if err != nil {
log.Log.Error("onvif.main.GetNotificationMessage(): " + err.Error())
return nil
}
if res.Body == nil {
log.Log.Error("onvif.main.GetNotificationMessage(): empty response Body")
return nil
}

defer res.Body.Close()
data, err := io.ReadAll(res.Body)
if err != nil {
log.Log.Error("onvif.main.GetNotificationMessage(): Failed to read response body: " + err.Error())
return nil
}

var envelope Envelope
if err := xml.Unmarshal(data, &envelope); err != nil {
log.Log.Error("onvif.main.GetNotificationMessage(): Failed to parse PullMessages XML: " + err.Error())
return nil;
}
log.Log.Debug("onvif.main.GetNotificationMessage(): Envelope unmarshalled")

return envelope.Body.PullMessagesResponse.NotificationMessage
}

// This method will get the digital inputs from the device.
// But will not give any status information.
func GetDigitalInputs(dev *onvif.Device) (device.GetDigitalInputsResponse, error) {
Expand Down