diff --git a/RTCPStatsPlugin/.gitignore b/RTCPStatsPlugin/.gitignore new file mode 100644 index 0000000..b4b139f --- /dev/null +++ b/RTCPStatsPlugin/.gitignore @@ -0,0 +1,5 @@ +/target/ +/.classpath +/.project +/.settings/ + diff --git a/RTCPStatsPlugin/README.md b/RTCPStatsPlugin/README.md new file mode 100644 index 0000000..f41b80c --- /dev/null +++ b/RTCPStatsPlugin/README.md @@ -0,0 +1,112 @@ +# RTCP Stats Plugin + +This plugin extracts metrics from [RTCP Sender Reports](https://datatracker.ietf.org/doc/html/rfc3550#section-6.4.1) and provides real-time streaming statistics with **frame-level NTP timestamp interpolation**. + +## Features + +- **Frame-level timing interpolation**: Provides precise NTP timestamps for every video/audio frame by interpolating between RTCP Sender Reports +- **Automatic clock rate detection**: Dynamically detects RTP clock rates from RTCP data + +## Important Notes + +- The plugin requires at least **two RTCP Sender Reports** before interpolation begins. This may cause some delay from the start of the stream until timing data becomes available +- Sender Reports must be spaced at least **1.0 seconds** apart for reliable clock rate detection +- `ntpTime` values may appear negative due to Java's signed long representation - handle as unsigned 64-bit values in frontend + +## Requirements + +- Ant Media Server **Enterprise** edition +- Custom FFmpeg build (included in resources) +- **Important**: Time between RTCP Sender Reports must be at least **1.0 second** for accurate interpolation + +## Installation + +1. **Build the project**: + ```bash + ./redeploy.sh + ``` + +2. **Deploy the plugin**: + ```bash + cp ./target/rtcp-stats-plugin-*.jar /path/to/antmedia/plugins/ + ``` + +3. **Install custom FFmpeg build**: + ```bash + # Delete existing FFmpeg JARs from plugins (excluding platform JAR) + rm /path/to/antmedia/plugins/ffmpeg-*-linux-x86_64.jar /path/to/antmedia/plugins/ffmpeg-[0-9]*.jar + # Copy new FFmpeg binaries + cp ./src/main/resources/ffmpeg/build/* /path/to/antmedia/plugins/ + ``` + +## Usage + +Once installed, for every SRT broadcast, viewers receive interpolated timing data via data channel. The plugin waits for at least **two RTCP Sender Reports** (spaced 1+ seconds apart) to detect clock rates, then provides frame-level interpolated NTP timestamps for every packet: + +```json +{ + "EVENT_TYPE": "rtcpSr", + "trackIndex": 0, + "pts": 2457581, + "ntpTime": -1441529631683400689, + "srReceptionTime": 1381854143, + "srNtpTime": -1441529631683400689 +} +``` + +**Field descriptions:** +- `EVENT_TYPE`: Always "rtcpSr" +- `trackIndex`: Stream index (typically but not guaranted, 0 = video, 1+ = audio) +- `pts`: Presentation timestamp of the current packet +- `ntpTime`: **Interpolated** NTP timestamp for this specific frame/packet +- `srReceptionTime`: RTP timestamp from the most recent RTCP Sender Report +- `srNtpTime`: Original NTP timestamp from the most recent RTCP Sender Report + +### Usage with JS SDK +To access this data in JS-SDK, follow [example](https://github.com/ant-media/StreamApp/blob/93aba178622b72475d6be414eb71d09462149398/src/main/webapp/samples/publish_webrtc.html#L683), and handle events on WebRTCAdatptor callback. + +```javascript +// NTP timestamp parser function +function simpleNtpParse(ntpTimeString) { + let ntpTime = BigInt(ntpTimeString); + if (ntpTime < 0n) { + ntpTime = ntpTime + (1n << 64n); + } + + return { + seconds: Number(ntpTime >> 32n), + fraction: Number(ntpTime & 0xFFFFFFFFn) + }; +} + +new WebRTCAdaptor({ + ......... + ......... + dataChannelEnabled: true, + callback: (info, obj) => { + if (info === "data_received") { + try { + let notificationEvent = JSON.parse(data); + if(notificationEvent != null && typeof(notificationEvent) == "object") { + let eventType = notificationEvent.EVENT_TYPE; + if (eventType == "rtcpSr") { + // We have interpolated frame timing data here! + let ntpParts = simpleNtpParse(notificationEvent.ntpTime); + console.log(`Track ${notificationEvent.trackIndex}: PTS=${notificationEvent.pts}, NTP=${notificationEvent.ntpTime}`); + console.log(`NTP Seconds: ${ntpParts.seconds}, Fraction: ${ntpParts.fraction}`); + } + } + } catch (exception) { + $("#all-messages").append("Received: " + data + "
"); + } + } + } +``` + + + + + + + + diff --git a/RTCPStatsPlugin/mvn-settings.xml b/RTCPStatsPlugin/mvn-settings.xml new file mode 100644 index 0000000..3365e68 --- /dev/null +++ b/RTCPStatsPlugin/mvn-settings.xml @@ -0,0 +1,23 @@ + + + + ossrh + ${env.CI_DEPLOY_USERNAME} + ${env.CI_DEPLOY_PASSWORD} + + + + + + ossrh + + true + + + gpg + ${env.GPG_KEY_NAME} + ${env.GPG_PASSPHRASE} + + + + \ No newline at end of file diff --git a/RTCPStatsPlugin/pom.xml b/RTCPStatsPlugin/pom.xml new file mode 100644 index 0000000..a66594f --- /dev/null +++ b/RTCPStatsPlugin/pom.xml @@ -0,0 +1,257 @@ + + + io.antmedia + parent + 3.0.0-SNAPSHOT + + 4.0.0 + io.antmedia.plugin + RTCPStatsPlugin + jar + RTCPStatsPlugin + http://maven.apache.org + + UTF-8 + + + + Petar + petar.ostojic@antmedia.io + + + + + Apache 2 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + false + + central + Central Repository + https://repo.maven.apache.org/maven2 + + + sonatype-snapshots + Sonatype Snapshots + https://oss.sonatype.org/content/repositories/snapshots + + + sonatype-releases + Sonatype Releases + https://oss.sonatype.org/content/repositories/releases + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + rtcp-stats-plugin + src/test/java + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar-no-fork + + + + + + maven-jar-plugin + + + org.apache.felix + maven-bundle-plugin + true + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + + + + + src/main/resources + + + src/main/java + + **/*.xml + + + + + + src/test/resources + + + src/main/webapp + + **/*.xml + **/*.properties + + + + + + + org.apache.tomcat.embed + tomcat-embed-websocket + ${tomcat.version} + provided + + + io.antmedia + ant-media-server + ${project.parent.version} + provided + + + io.antmedia.enterprise + ant-media-enterprise + ${project.parent.version} + provided + + + org.slf4j + slf4j-api + provided + + + org.apache.mina + mina-core + ${mina.version} + bundle + provided + + + org.springframework + spring-beans + ${spring.version} + provided + + + org.springframework + spring-context-support + provided + + + org.springframework + spring-test + ${spring.version} + provided + + + org.springframework + spring-context + provided + + + org.springframework + spring-core + provided + + + org.springframework + spring-expression + ${spring.version} + provided + + + org.springframework + spring-aop + ${spring.version} + provided + + + org.springframework + spring-web + provided + + + org.glassfish.jersey.media + jersey-media-json-jackson + ${jersey.version} + provided + + + org.glassfish.jersey.ext + jersey-spring6 + ${jersey.version} + provided + + + junit + junit + test + + + org.bytedeco + ffmpeg-platform + ${javacpp.ffmpeg.version} + provided + + + org.mockito + mockito-core + ${mockito-core.version} + test + + + RTCPStatsPlugin plugin for Ant Media Server + + Ant Media + http://antmedia.io + + diff --git a/RTCPStatsPlugin/redeploy.sh b/RTCPStatsPlugin/redeploy.sh new file mode 100755 index 0000000..8f13556 --- /dev/null +++ b/RTCPStatsPlugin/redeploy.sh @@ -0,0 +1,18 @@ +#!/bin/sh +AMS_DIR=~/softwares/ant-media-server +mvn clean install -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -Dgpg.skip=true +OUT=$? + +if [ $OUT -ne 0 ]; then + exit $OUT +fi + +rm -rf $AMS_DIR/plugins/rtcp-stats-plugin* +cp target/rtcp-stats-plugin.jar $AMS_DIR/plugins/ + +OUT=$? + +if [ $OUT -ne 0 ]; then + exit $OUT +fi +#./start-debug.sh diff --git a/RTCPStatsPlugin/src/main/java/io/antmedia/app/RTCPStatsPacketListener.java b/RTCPStatsPlugin/src/main/java/io/antmedia/app/RTCPStatsPacketListener.java new file mode 100644 index 0000000..aa395e7 --- /dev/null +++ b/RTCPStatsPlugin/src/main/java/io/antmedia/app/RTCPStatsPacketListener.java @@ -0,0 +1,256 @@ +package io.antmedia.app; + +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.enterprise.webrtc.WebRTCApplication; +import io.antmedia.enterprise.webrtc.datachannel.DataChannelConstants; +import io.antmedia.enterprise.webrtc.datachannel.DataChannelRouter; +import io.antmedia.plugin.RTCPStatsPlugin; +import io.antmedia.plugin.api.IPacketListener; +import io.antmedia.plugin.api.StreamParametersInfo; +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.ffmpeg.avcodec.AVProducerReferenceTime; +import org.bytedeco.javacpp.BytePointer; +import org.bytedeco.javacpp.SizeTPointer; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static org.bytedeco.ffmpeg.global.avcodec.AV_PKT_DATA_PRFT; +import static org.bytedeco.ffmpeg.global.avcodec.av_packet_get_side_data; + +/* + * Algorithm: + * + * RTCP Sender Reports (SR) are sent on interval ~1-5 sec and contain synchronized NTP and RTP timestamps. + * This data is embeded in 'AVProducerReferenceTime' packet sidedata by custom FFMPEG build. + * To get NTP timestamp for every RTP frame, we interpolate on timing relationship established by data from TWO SRs + * + * How it works: + * 1. - Clock Rate Detection (requires TWO SRs): + * - Wait for first SR: store its NTP and RTP timestamps + * - Wait for second SR: calculate time differences between the two SRs + * - clock rate = RTP_time_diff / NTP_time_diff (Usually 90kHz - video, 48kHz - audio) + * + * 2. - Interpolation (uses most recent SR as reference): + * - For every RTP packet with timestamp Z, use most recent SR as reference point + * - Calculate: Z - SR_RTP_time = RTP time elapsed since last SR + * - Convert to seconds: elapsed_seconds = RTP_time_elapsed / detected_clock_rate + * - Interpolated NTP time = SR_NTP_time + elapsed_seconds + * + * !!!!!!!!!!!!!!!!!!!!!!!!!!! + * REMAINDER: Time between two SRs must be at least 1 sec apart for this to work + * Improvement 1: Re-Detect clock-rate periodically to account for possible clock drift + * Improvement 2: To have clock immidiately after first SR, we could do an less precise deteciton algorithm, and later switch to precise when SRs are received + * !!!!!!!!!!!!!!!!!!!!!!!!!!! + */ +public class RTCPStatsPacketListener implements IPacketListener { + + private static final Logger logger = LoggerFactory.getLogger(RTCPStatsPacketListener.class); + + private final String streamId; + private final WebRTCApplication appAdapter; + private final SizeTPointer sideDataPtr = new SizeTPointer(1); + + private final RTCPStatsPluginSettings settings; + + private final JSONObject jsonResponse = new JSONObject(); + + + private final Map lastSrNtpTime = new HashMap<>(); + private final Map lastSrRtpTime = new HashMap<>(); + private final Map detectedClockRate = new HashMap<>(); + + public RTCPStatsPacketListener(String streamId, AntMediaApplicationAdapter appAdapter, RTCPStatsPluginSettings settings) { + this.streamId = streamId; + this.appAdapter = (WebRTCApplication) appAdapter; + this.settings = settings; + } + + @Override + public AVPacket onVideoPacket(String streamId, AVPacket packet) { + if (!detectedClockRate.containsKey(packet.stream_index())) { + detectClockRateFromRtcp(packet); + return packet; + } + + processTimingInterpolation(streamId, packet); + return packet; + } + + @Override + public AVPacket onAudioPacket(String streamId, AVPacket packet) { + if (!detectedClockRate.containsKey(packet.stream_index())) { + detectClockRateFromRtcp(packet); + return packet; + } + + processTimingInterpolation(streamId, packet); + return packet; + } + + private void processTimingInterpolation(String streamId, AVPacket packet) { + sideDataPtr.put(0); + BytePointer sideData = av_packet_get_side_data(packet, AV_PKT_DATA_PRFT, sideDataPtr); + if (sideData == null || sideDataPtr.get(0) < 1) { + return; + } + + int streamIndex = packet.stream_index(); // Stream index (0 = video, 1+ = audio typically) + long rtpDetectedClockRate = detectedClockRate.getOrDefault(streamIndex, -1L); + if (rtpDetectedClockRate < 0) { + // Clock rate not detected yet + return; + } + + try { + AVProducerReferenceTime prft = new AVProducerReferenceTime(sideData); + + long rtcpNtpTime = prft.last_rtcp_ntp_time(); // NTP timestamp from RTCP Sender Report (64-bit: upper 32 bits = seconds since 1900, lower 32 bits = fraction) + long rtcpRtpTime = prft.last_rtcp_timestamp(); // RTP timestamp from RTCP Sender Report (32-bit RTP clock units) + long packetRtpTime = prft.last_rtp_timestamp(); // Current packet's RTP timestamp (32-bit RTP clock units) + if (rtcpNtpTime == 0 || rtcpRtpTime == 0) { + // No data... + return; + } + + // Calculate RTP timestamp difference + // Note: RTP timestamps are 32-bit unsigned values, but Java long is signed. + // We mask with 0xFFFFFFFFL to treat them as unsigned for proper arithmetic, + // then handle potential wraparound (RTP timestamps wrap from 0xFFFFFFFF back to 0x00000000) + long rtpDiff = (packetRtpTime & 0xFFFFFFFFL) - (rtcpRtpTime & 0xFFFFFFFFL); + rtpDiff = handleRtpTimestampWraparound(rtpDiff); + + // RTP ticks to seconds + double timeDiffSeconds = (double) rtpDiff / rtpDetectedClockRate; + + // Convert time difference to NTP fraction units (2^32 fractions per second) + // and and add to the RTCP NTP timestamp + long ntpFractionDiff = (long) (timeDiffSeconds * 4294967296.0); + long interpolatedNtpTime = rtcpNtpTime + ntpFractionDiff; + + + jsonResponse.clear(); + jsonResponse.put(DataChannelConstants.EVENT_TYPE, RTCPStatsPlugin.RTCP_SENDER_REPORT_EVENT); + jsonResponse.put("trackIndex", streamIndex); + jsonResponse.put("pts", packet.pts()); + jsonResponse.put("ntpTime", interpolatedNtpTime); + jsonResponse.put("srReceptionTime", rtcpRtpTime); + jsonResponse.put("srNtpTime", rtcpNtpTime); + + byte[] dataBytes = jsonResponse.toJSONString().getBytes(Charset.defaultCharset()); + DataChannelRouter dataChannelRouter = appAdapter.getDataChannelRouter(); + dataChannelRouter.publisherMessageReceived(streamId, dataBytes, false); + + + } catch (Exception e) { + logger.warn("Error in timing interpolation for stream: {} - {}", streamId, e.getMessage()); + } + } + + private void detectClockRateFromRtcp(AVPacket packet) { + sideDataPtr.put(0); + BytePointer sideData = av_packet_get_side_data(packet, AV_PKT_DATA_PRFT, sideDataPtr); + if (sideData == null || sideDataPtr.get(0) < 1) { + return; + } + + AVProducerReferenceTime prft = new AVProducerReferenceTime(sideData); + + long currentNtpTime = prft.last_rtcp_ntp_time(); // NTP timestamp from RTCP Sender Report (64-bit: upper 32 bits = seconds since 1900, lower 32 bits = fraction) + long currentRtpTime = prft.last_rtcp_timestamp(); // RTP timestamp from RTCP Sender Report (32-bit RTP clock units) + int trackIndex = packet.stream_index(); + + Long lastNtp = lastSrNtpTime.get(trackIndex); + Long lastRtp = lastSrRtpTime.get(trackIndex); + + lastSrNtpTime.put(trackIndex, currentNtpTime); + lastSrRtpTime.put(trackIndex, currentRtpTime); + + if (lastNtp == null || lastRtp == null) { + return; + } + + if (currentNtpTime == lastNtp && currentRtpTime == lastRtp) { + return; // We have received only one SR + } + + try { + // Calculate NTP time difference in seconds + double ntpDiffSeconds = convertNtpDiffToSeconds(currentNtpTime - lastNtp); + + // Calculate RTP time difference + long rtpDiff = handleRtpTimestampWraparound(currentRtpTime - lastRtp); + + // SR time diff is too small (unreliable) + if (ntpDiffSeconds < 1.0) { + logger.warn("NTP time difference too small ({} s), waiting for larger interval", String.format("%.3f", ntpDiffSeconds)); + return; + } + + double effectiveClockRate = Math.abs(rtpDiff) / ntpDiffSeconds; + long detectedRate = Math.round(effectiveClockRate); + if (detectedRate < 1000 || detectedRate > 1000000) { + logger.warn("Detected unreasonable clock rate ({} Hz) for stream {}, ignoring", detectedRate, trackIndex); + return; + } + + detectedClockRate.put(trackIndex, detectedRate); + + logger.info("=== RTCP SR CLOCK RATE DETECTION === Stream: {} Track: {}", streamId, trackIndex); + logger.info("Detected RTP clock rate: {} Hz", detectedRate); + logger.info("Based on RTCP SR interval: {} s, RTP diff: {}", String.format("%.3f", ntpDiffSeconds), rtpDiff); + logger.info("Previous SR - NTP: {}, RTP: {}", lastNtp, lastRtp); + logger.info("Current SR - NTP: {}, RTP: {}", currentNtpTime, currentRtpTime); + logger.info("=== DETECTION COMPLETE ==="); + + } catch (Exception e) { + logger.warn("Error detecting clock rate from RTCP SR for stream {}: {}", trackIndex, e.getMessage()); + } + } + + private double convertNtpDiffToSeconds(long ntpDiff) { + // NTP timestamp: upper 32 bits = seconds, lower 32 bits = fraction + // Convert the 64-bit NTP difference to seconds as double + + // Extract seconds and fraction parts + long secondsPart = ntpDiff >> 32; // Signed shift for seconds + long fractionPart = ntpDiff & 0xFFFFFFFFL; // Unsigned lower 32 bits + + // Convert fraction to decimal (2^32 fractions per second) + double fractionSeconds = fractionPart / 4294967296.0; + return secondsPart + fractionSeconds; + } + + /** + * Handles 32-bit RTP timestamp wraparound by correcting the difference. + * RTP timestamps are 32-bit values that can wrap around from 0xFFFFFFFF to 0x00000000. + */ + private long handleRtpTimestampWraparound(long rtpDiff) { + // If difference > 2^31, we wrapped backward (subtract 2^32) + // If difference < -2^31, we wrapped forward (add 2^32) + if (rtpDiff > 2147483648L) { + // backward wraparound + rtpDiff -= 4294967296L; + } else if (rtpDiff < -2147483648L) { + // forward wraparound + rtpDiff += 4294967296L; + } + return rtpDiff; + } + + @Override + public void setVideoStreamInfo(String streamId, StreamParametersInfo videoStreamInfo) { + } + + @Override + public void setAudioStreamInfo(String streamId, StreamParametersInfo audioStreamInfo) { + } + + @Override + public void writeTrailer(String streamId) { + } +} \ No newline at end of file diff --git a/RTCPStatsPlugin/src/main/java/io/antmedia/app/RTCPStatsPluginSettings.java b/RTCPStatsPlugin/src/main/java/io/antmedia/app/RTCPStatsPluginSettings.java new file mode 100644 index 0000000..3f292a8 --- /dev/null +++ b/RTCPStatsPlugin/src/main/java/io/antmedia/app/RTCPStatsPluginSettings.java @@ -0,0 +1,4 @@ +package io.antmedia.app; + +public class RTCPStatsPluginSettings { +} diff --git a/RTCPStatsPlugin/src/main/java/io/antmedia/plugin/RTCPStatsPlugin.java b/RTCPStatsPlugin/src/main/java/io/antmedia/plugin/RTCPStatsPlugin.java new file mode 100644 index 0000000..7451b62 --- /dev/null +++ b/RTCPStatsPlugin/src/main/java/io/antmedia/plugin/RTCPStatsPlugin.java @@ -0,0 +1,112 @@ +package io.antmedia.plugin; + +import com.google.gson.Gson; +import io.antmedia.app.RTCPStatsPluginSettings; +import io.antmedia.datastore.db.types.Broadcast; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.app.RTCPStatsPacketListener; +import io.antmedia.muxer.IAntMediaStreamHandler; +import io.antmedia.plugin.api.IStreamListener; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component(value="plugin.rtcp-stats") +public class RTCPStatsPlugin implements ApplicationContextAware, IStreamListener { + + private static Logger logger = LoggerFactory.getLogger(RTCPStatsPlugin.class); + + public final static String RTCP_SENDER_REPORT_EVENT = "rtcpSr"; + + private ApplicationContext applicationContext; + private RTCPStatsPluginSettings settings; + + private final Map packetListeners = new ConcurrentHashMap<>(); + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + + IAntMediaStreamHandler app = getApplication(); + app.addStreamListener(this); + + refreshSettings(); + logger.info("Initialized!"); + } + + @Override + public void streamStarted(String streamId) { + logger.info("Stream started: {}", streamId); + + String streamUrl = getStreamUrl(streamId); + if (streamUrl == null || (!streamUrl.startsWith("rtsp://") && !streamUrl.startsWith("rtp://"))) { + return; + } + + AntMediaApplicationAdapter app = getApplication(); + if (!app.isDataChannelEnabled()) { + return; + } + + RTCPStatsPacketListener packetListener = new RTCPStatsPacketListener(streamId, app, settings); + if (app.addPacketListener(streamId, packetListener)) { + packetListeners.put(streamId, packetListener); + logger.info("Registered rtcp stats listener for stream: {}", streamId); + } else { + logger.warn("Failed to register rtcp stats listener for stream: {}", streamId); + } + } + + @Override + public void streamFinished(String streamId) { + RTCPStatsPacketListener packetListener = packetListeners.remove(streamId); + if (packetListener == null) { + return; + } + + IAntMediaStreamHandler app = getApplication(); + app.removePacketListener(streamId, packetListener); + logger.info("Removed rtcp stats listener for stream: {}", streamId); + } + + @Override + public void joinedTheRoom(String roomId, String streamId) { + } + + @Override + public void leftTheRoom(String roomId, String streamId) { + } + + private AntMediaApplicationAdapter getApplication() { + return (AntMediaApplicationAdapter) applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME); + } + + private String getStreamUrl(String streamId) { + AntMediaApplicationAdapter antApp = getApplication(); + Broadcast broadcast = antApp.getDataStore().get(streamId); + return broadcast != null ? broadcast.getStreamUrl() : null; + } + + private void refreshSettings() { + try { + Object customSetting = getApplication().getAppSettings().getCustomSetting("plugin.rtcp-stats"); + if (customSetting != null) { + Gson gson = new Gson(); + this.settings = gson.fromJson(customSetting.toString(), RTCPStatsPluginSettings.class); + } + } catch (Exception e) { + logger.error("Error refreshing plugin settings, using defaults", e); + } finally { + if (this.settings == null) { + this.settings = new RTCPStatsPluginSettings(); + } + } + } +} \ No newline at end of file diff --git a/RTCPStatsPlugin/src/main/resources/build/ffmpeg-7.1-1.5.11-linux-x86_64.jar b/RTCPStatsPlugin/src/main/resources/build/ffmpeg-7.1-1.5.11-linux-x86_64.jar new file mode 100644 index 0000000..1570d79 Binary files /dev/null and b/RTCPStatsPlugin/src/main/resources/build/ffmpeg-7.1-1.5.11-linux-x86_64.jar differ diff --git a/RTCPStatsPlugin/src/main/resources/build/ffmpeg-7.1-1.5.11.jar b/RTCPStatsPlugin/src/main/resources/build/ffmpeg-7.1-1.5.11.jar new file mode 100644 index 0000000..e10a205 Binary files /dev/null and b/RTCPStatsPlugin/src/main/resources/build/ffmpeg-7.1-1.5.11.jar differ diff --git a/RTCPStatsPlugin/src/main/resources/rtcp_ntp_prft.patch b/RTCPStatsPlugin/src/main/resources/rtcp_ntp_prft.patch new file mode 100644 index 0000000..b7d8aad --- /dev/null +++ b/RTCPStatsPlugin/src/main/resources/rtcp_ntp_prft.patch @@ -0,0 +1,62 @@ +diff --git a/libavcodec/defs.h b/libavcodec/defs.h +index 1234567..8901234 100644 +--- a/libavcodec/defs.h ++++ b/libavcodec/defs.h +@@ -324,6 +324,11 @@ typedef struct AVProducerReferenceTime { + */ + int64_t wallclock; + int flags; ++ uint64_t last_rtcp_ntp_time; ++ uint64_t last_rtcp_reception_time; ++ uint32_t last_rtcp_packet_count; ++ uint32_t last_rtp_timestamp; ++ uint32_t last_rtcp_timestamp; + } AVProducerReferenceTime; + + /** + +diff --git a/libavformat/rtpdec.h b/libavformat/rtpdec.h +index 1234567..8901234 100644 +--- a/libavformat/rtpdec.h ++++ b/libavformat/rtpdec.h +@@ -176,6 +176,7 @@ typedef struct RTPDemuxContext { + uint32_t last_rtcp_timestamp; + int64_t rtcp_ts_offset; ++ uint32_t last_rtcp_sr_packet_count; + + /* rtcp sender statistics */ + unsigned int packet_count; + +diff --git a/libavformat/rtpdec.c b/libavformat/rtpdec.c +index 1234567..8901234 100644 +--- a/libavformat/rtpdec.c ++++ b/libavformat/rtpdec.c +@@ -190,7 +190,7 @@ static int rtcp_parse_packet(RTPDemuxContext *s, const unsigned char *buf, + + switch (buf[1]) { + case RTCP_SR: +- if (payload_len < 20) { ++ if (payload_len < 24) { + av_log(s->ic, AV_LOG_ERROR, "Invalid RTCP SR packet length\n"); + return AVERROR_INVALIDDATA; + } +@@ -198,6 +198,8 @@ static int rtcp_parse_packet(RTPDemuxContext *s, const unsigned char *buf, + s->last_rtcp_reception_time = av_gettime_relative(); + s->last_rtcp_ntp_time = AV_RB64(buf + 8); + s->last_rtcp_timestamp = AV_RB32(buf + 16); ++ if (payload_len >= 24) ++ s->last_rtcp_sr_packet_count = AV_RB32(buf + 20); + if (s->first_rtcp_ntp_time == AV_NOPTS_VALUE) { + s->first_rtcp_ntp_time = s->last_rtcp_ntp_time; + if (!s->base_timestamp) +@@ -647,6 +649,11 @@ static int rtp_set_prft(RTPDemuxContext *s, AVPacket *pkt, uint32_t timestamp) + + prft->wallclock = rtcp_time + delta_time; + prft->flags = 24; ++ prft->last_rtcp_ntp_time = s->last_rtcp_ntp_time; ++ prft->last_rtcp_reception_time = s->last_rtcp_reception_time; ++ prft->last_rtcp_packet_count = s->last_rtcp_sr_packet_count; ++ prft->last_rtcp_timestamp = s->last_rtcp_timestamp; ++ prft->last_rtp_timestamp = timestamp; + return 0; + } diff --git a/RTCPStatsPlugin/src/test/java/io/antmedia/test/plugin/RTCPStatsPluginTest.java b/RTCPStatsPlugin/src/test/java/io/antmedia/test/plugin/RTCPStatsPluginTest.java new file mode 100644 index 0000000..abb41d8 --- /dev/null +++ b/RTCPStatsPlugin/src/test/java/io/antmedia/test/plugin/RTCPStatsPluginTest.java @@ -0,0 +1,190 @@ +package io.antmedia.test.plugin; + +import io.antmedia.AntMediaApplicationAdapter; +import io.antmedia.AppSettings; +import io.antmedia.app.RTCPStatsPacketListener; +import io.antmedia.datastore.db.DataStore; +import io.antmedia.datastore.db.types.Broadcast; +import io.antmedia.enterprise.webrtc.WebRTCApplication; +import io.antmedia.enterprise.webrtc.datachannel.DataChannelRouter; +import io.antmedia.plugin.RTCPStatsPlugin; +import org.bytedeco.ffmpeg.avcodec.AVPacket; +import org.bytedeco.ffmpeg.avcodec.AVProducerReferenceTime; +import org.bytedeco.javacpp.BytePointer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.context.ApplicationContext; + +import static org.bytedeco.ffmpeg.global.avcodec.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +public class RTCPStatsPluginTest { + + private RTCPStatsPlugin plugin; + private ApplicationContext applicationContext; + private WebRTCApplication app; + private AppSettings appSettings; + private DataStore dataStore; + private DataChannelRouter dataChannelRouter; + + @Before + public void setUp() { + plugin = Mockito.spy(new RTCPStatsPlugin()); + applicationContext = Mockito.mock(ApplicationContext.class); + app = Mockito.mock(WebRTCApplication.class); + appSettings = new AppSettings(); + dataStore = Mockito.mock(DataStore.class); + dataChannelRouter = Mockito.mock(DataChannelRouter.class); + + when(applicationContext.getBean(AntMediaApplicationAdapter.BEAN_NAME)).thenReturn(app); + when(app.getAppSettings()).thenReturn(appSettings); + when(app.getDataStore()).thenReturn(dataStore); + when(app.getDataChannelRouter()).thenReturn(dataChannelRouter); + + plugin.setApplicationContext(applicationContext); + } + + @Test + public void testStreamStartedWithDifferentUrls() { + when(app.isDataChannelEnabled()).thenReturn(true); + when(app.addPacketListener(anyString(), any(RTCPStatsPacketListener.class))).thenReturn(true); + + Broadcast rtspBroadcast = new Broadcast(); + rtspBroadcast.setStreamUrl("rtsp://example.com/stream"); + + Broadcast rtpBroadcast = new Broadcast(); + rtpBroadcast.setStreamUrl("rtp://example.com/stream"); + + Broadcast httpBroadcast = new Broadcast(); + httpBroadcast.setStreamUrl("http://example.com/stream"); + + when(dataStore.get("httpStream")).thenReturn(httpBroadcast); + when(dataStore.get("rtspStream")).thenReturn(rtspBroadcast); + when(dataStore.get("rtpStream")).thenReturn(rtpBroadcast); + + // Test RTSP - should register packet listener + plugin.streamStarted("rtspStream"); + verify(app, times(1)).addPacketListener(eq("rtspStream"), any(RTCPStatsPacketListener.class)); + + // Test RTP - should register packet listener + plugin.streamStarted("rtpStream"); + verify(app, times(1)).addPacketListener(eq("rtpStream"), any(RTCPStatsPacketListener.class)); + + // Test non-RTSP/RTP URL - should NOT register packet listener + plugin.streamStarted("httpStream"); + verify(app, times(0)).addPacketListener(eq("httpStream"), any(RTCPStatsPacketListener.class)); + } + + @Test + public void testStreamStartedWithNullUrl() { + when(dataStore.get("testStream")).thenReturn(null); + + plugin.streamStarted("testStream"); + + verify(app, times(0)).addPacketListener(anyString(), any(RTCPStatsPacketListener.class)); + } + + @Test + public void testStreamStartedWithDataChannelDisabled() { + Broadcast broadcast = new Broadcast(); + broadcast.setStreamUrl("rtsp://example.com/stream"); + + when(app.isDataChannelEnabled()).thenReturn(false); + when(dataStore.get("testStream")).thenReturn(broadcast); + + plugin.streamStarted("testStream"); + + verify(app, times(0)).addPacketListener(anyString(), any(RTCPStatsPacketListener.class)); + } + + @Test + public void testStreamFinished() { + Broadcast broadcast = new Broadcast(); + broadcast.setStreamUrl("rtsp://example.com/stream"); + + when(app.isDataChannelEnabled()).thenReturn(true); + when(dataStore.get("testStream")).thenReturn(broadcast); + when(app.addPacketListener(anyString(), any(RTCPStatsPacketListener.class))).thenReturn(true); + + plugin.streamStarted("testStream"); + verify(app, times(1)).addPacketListener(anyString(), any(RTCPStatsPacketListener.class)); + + plugin.streamFinished("testStream"); + verify(app, times(1)).removePacketListener(anyString(), any(RTCPStatsPacketListener.class)); + + plugin.streamFinished("notExistingStrem"); + // Times should basically be 0, here, but we put 1, since verify will count + // all the calls from start of the test + verify(app, times(1)).removePacketListener(anyString(), any(RTCPStatsPacketListener.class)); + } + + @Test + public void testGetStreamUrl() { + Broadcast broadcast = new Broadcast(); + broadcast.setStreamUrl("rtsp://example.com/stream"); + + when(dataStore.get("testStream")).thenReturn(broadcast); + + // should trigger getUrl + plugin.streamStarted("testStream"); + + verify(dataStore, times(1)).get("testStream"); + } + + @Test + public void testGetStreamUrlWithNoBroadcast() { + when(dataStore.get("testStream")).thenReturn(null); + + plugin.streamStarted("testStream"); + + verify(dataStore, times(1)).get("testStream"); + verify(app, times(0)).addPacketListener(anyString(), any(RTCPStatsPacketListener.class)); + } + + @Test + public void testRefreshSettingsWithCustomSettings() { + appSettings.setCustomSetting("plugin.rtcp-stats", "{\"updateOnlyOnNewSR\":false}"); + + RTCPStatsPlugin newPlugin = Mockito.spy(new RTCPStatsPlugin()); + newPlugin.setApplicationContext(applicationContext); + + // Settings are refreshed during initialization + // 2 times, once for setUp, once for this test + verify(app, times(2)).getAppSettings(); + } + + @Test + public void testOnVideoPacketTest() { + // Packet data mocking stuff... + AVPacket packet = new AVPacket(); + av_new_packet(packet, 600); + packet.stream_index(0); + packet.pts(12345L); + + AVProducerReferenceTime prft = new AVProducerReferenceTime(); + prft.last_rtcp_ntp_time(656899696L); + prft.last_rtcp_reception_time(777117717L); + prft.last_rtcp_packet_count(66); + + BytePointer sideDataPtr = new BytePointer(prft); + av_packet_add_side_data(packet, AV_PKT_DATA_PRFT, sideDataPtr, prft.sizeof()); + + + when(app.isDataChannelEnabled()).thenReturn(true); + io.antmedia.app.RTCPStatsPluginSettings settings = new io.antmedia.app.RTCPStatsPluginSettings(); + RTCPStatsPacketListener packetListener = new RTCPStatsPacketListener("testStream", app, settings); + + // Verify if msg publish was called for video + packetListener.onVideoPacket("testStream", packet); + verify(dataChannelRouter, times(1)).publisherMessageReceived(eq("testStream"), any(byte[].class), eq(false)); + + // And for audio + prft.last_rtcp_ntp_time(996899696L); + prft.last_rtcp_reception_time(997117717L); + prft.last_rtcp_packet_count(99); + packetListener.onAudioPacket("testStream", packet); + verify(dataChannelRouter, times(2)).publisherMessageReceived(eq("testStream"), any(byte[].class), eq(false)); + } +} \ No newline at end of file