Skip to content

Commit

Permalink
Merge branch 'master' into removeSubtrackPoller
Browse files Browse the repository at this point in the history
  • Loading branch information
burak-58 committed Feb 23, 2025
2 parents 1cca338 + d09668f commit 096bff8
Show file tree
Hide file tree
Showing 19 changed files with 322 additions and 70 deletions.
10 changes: 10 additions & 0 deletions owasp-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,15 @@
<packageUrl regex="true">^pkg:maven/org\.bytedeco/ffmpeg@.*$</packageUrl>
<vulnerabilityName>CVE-2023-6603</vulnerabilityName>
</suppress>
<suppress>
<notes><![CDATA[
file name: ffmpeg-7.1-1.5.11.jar
file name: ffmpeg-7.1-1.5.11-linux-arm64.jar
We're not using immersive audio so that this vulnerability does not have an impact to Ant Media Server. When the fix is available,
we'll also update the FFmpeg version in Ant Media Server.
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.bytedeco/ffmpeg@.*$</packageUrl>
<vulnerabilityName>CVE-2025-25469</vulnerabilityName>
</suppress>

</suppressions>
4 changes: 1 addition & 3 deletions src/main/java/io/antmedia/datastore/db/types/Broadcast.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
@Index(fields = @Field("status")) })
public class Broadcast {

private static final Logger logger = LoggerFactory.getLogger(Broadcast.class);

@JsonIgnore
@Id
private ObjectId dbId;
Expand Down Expand Up @@ -252,7 +250,7 @@ public class Broadcast {
* rest service or management console It is false by default
*
*/
@Schema(description ="is true, if a broadcast that is not added to data store through rest service or management console It is false by default")
@Schema(description ="This value is true when a broadcast is created directly through streaming without created earlier through rest service. It is false by default")
private boolean zombi = false;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/antmedia/datastore/db/types/VoD.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class VoD implements Serializable {
@Schema(description = "The size of the VoD file in bytes")
private long fileSize;

@Schema(description = "The path of the VoD")
@Schema(description = "The relative path of the VoD after app name such as streams/{vodId}.mp4")
private String filePath;

@Schema(description = "The id of the VoD")
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/antmedia/muxer/IAntMediaStreamHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ public interface IAntMediaStreamHandler {
* @param file video file that muxed is finished
* @param duration of the video in milliseconds
* @param resolution height of the video
* @param previewFilePath path of the preview file
*
*/
public void muxingFinished(Broadcast broadcast, String streamId, File file, long startTime, long duration , int resolution, String path, String vodId);
public void muxingFinished(Broadcast broadcast, String streamId, File file, long startTime, long duration , int resolution, String previewFilePath, String vodId);

/**
* Update stream quality, speed and number of pending packet size and update time
Expand Down
44 changes: 26 additions & 18 deletions src/main/java/io/antmedia/muxer/MuxAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public class MuxAdaptor implements IRecordingListener, IEndpointStatusListener {
private boolean firstKeyFrameReceivedChecked = false;
private long lastKeyFramePTS =0;
protected String streamId;
protected long startTime;
private long startTime;

protected IScope scope;

Expand Down Expand Up @@ -957,7 +957,6 @@ public void addStream2Muxers(AVCodecParameters codecParameters, AVRational rat,
}
}

startTime = System.currentTimeMillis();
}

public void prepareMuxerIO()
Expand All @@ -975,7 +974,6 @@ public void prepareMuxerIO()
}
}
}
startTime = System.currentTimeMillis();

}

Expand Down Expand Up @@ -1362,11 +1360,7 @@ public void execute()

if (stopRequestExist) {
logger.info("Stop request exists for stream:{}", streamId);
broadcastStream.removeStreamListener(MuxAdaptor.this);
logger.warn("closing adaptor for {} ", streamId);
closeResources();
logger.warn("closed adaptor for {}", streamId);
getStreamHandler().stopPublish(streamId);
clearAndStopStream();
//finally code execute and reset the isPipeReaderJobRunning
return;

Expand Down Expand Up @@ -1439,7 +1433,6 @@ public void execute()

updateQualityParameters(dts, TIME_BASE_FOR_MS, 0, isKeyFrame);


if (bufferTimeMs == 0)
{
writeStreamPacket(packet);
Expand All @@ -1450,13 +1443,23 @@ else if (bufferTimeMs > 0)
}

}

long now = System.currentTimeMillis();
//check that at least timeout period has passed since last update time
if ((now - startTime) > AntMediaApplicationAdapter.STREAM_TIMEOUT_MS &&
(now - lastQualityUpdateTime) > AntMediaApplicationAdapter.STREAM_TIMEOUT_MS)
{
//It's not updated for timeout period, it means that stream is not sending packets and it is accepted as offline
//close Rtmp Connection
logger.info("Closing Rtmp Connection because it's not updated for {}ms last update time:{} for stream:{}", AntMediaApplicationAdapter.STREAM_TIMEOUT_MS, lastQualityUpdateTime, streamId);
closeRtmpConnection();
stopRequestExist = true;
}


if (stopRequestExist) {
broadcastStream.removeStreamListener(MuxAdaptor.this);
logger.warn("closing adaptor for {} ", streamId);
closeResources();
logger.warn("closed adaptor for {}", streamId);
getStreamHandler().stopPublish(streamId);
logger.info("Stop request exists for stream:{}", streamId);
clearAndStopStream();
}
}
finally {
Expand All @@ -1466,6 +1469,14 @@ else if (bufferTimeMs > 0)
}
}
}

public void clearAndStopStream() {
broadcastStream.removeStreamListener(MuxAdaptor.this);
logger.warn("closing adaptor for {} ", streamId);
closeResources();
logger.warn("closed adaptor for {}", streamId);
getStreamHandler().stopPublish(streamId);
}


public void addBufferQueue(IStreamPacket packet) {
Expand Down Expand Up @@ -1830,6 +1841,7 @@ public boolean directMuxingSupported() {
public void start() {
logger.info("Number of items in the queue while adaptor is being started to prepare is {}", getInputQueueSize());

startTime = System.currentTimeMillis();
vertx.executeBlocking(() -> {
logger.info("before prepare for {}", streamId);
Boolean successful = false;
Expand Down Expand Up @@ -2124,10 +2136,6 @@ public boolean isPreviewOverwrite() {
return previewOverwrite;
}

public long getStartTime() {
return startTime;
}


public void setStartTime(long startTime) {
this.startTime = startTime;
Expand Down
28 changes: 10 additions & 18 deletions src/main/java/io/antmedia/rest/BroadcastRestService.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
package io.antmedia.rest;

import java.net.URI;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.UriInfo;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.json.simple.JSONObject;
import org.springframework.stereotype.Component;

import com.amazonaws.util.Base32;
Expand All @@ -25,7 +18,6 @@
import io.antmedia.datastore.db.DataStore;
import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.datastore.db.types.BroadcastUpdate;
import io.antmedia.datastore.db.types.Broadcast.PlayListItem;
import io.antmedia.datastore.db.types.ConferenceRoom;
import io.antmedia.datastore.db.types.ConnectionEvent;
import io.antmedia.datastore.db.types.Endpoint;
Expand All @@ -49,9 +41,19 @@
import io.antmedia.statistic.type.WebRTCVideoSendStats;
import io.antmedia.streamsource.StreamFetcher;
import io.antmedia.webrtc.api.IWebRTCAdaptor;
import io.swagger.v3.oas.annotations.ExternalDocumentation;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.info.Contact;
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.info.License;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.servers.Server;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
Expand All @@ -64,16 +66,6 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.info.Contact;
import io.swagger.v3.oas.annotations.info.License;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.ExternalDocumentation;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.servers.Server;

@OpenAPIDefinition(
info = @Info(
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/antmedia/rest/RestServiceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ protected Result updateStreamSource(String streamId, BroadcastUpdate updatedBroa

//Stop if it's streaming and type is not playlist
if (isStreamingActive && !isPlayList) {
boolean resultStopStreaming = checkStopStreaming(broadcastInDB);
waitStopStreaming(broadcastInDB, resultStopStreaming);
checkStopStreaming(broadcastInDB);
waitStopStreaming(broadcastInDB);
}

if (AntMediaApplicationAdapter.IP_CAMERA.equals(broadcastInDB.getType()) &&
Expand Down Expand Up @@ -534,12 +534,13 @@ protected Result updateStreamSource(String streamId, BroadcastUpdate updatedBroa

}


removeEmptyPlayListItems(updatedBroadcast.getPlayListItemList());

updatePlayListItemDurationsIfApplicable(updatedBroadcast.getPlayListItemList(), updatedBroadcast.getStreamId());


//don't update the status - this is the fix for this issue -> https://github.com/ant-media/Ant-Media-Server/issues/7055
//test code is ConsoleAppRestServiceTest#testUpdateStreamSourceDoesnotRestart
updatedBroadcast.setStatus(null);
boolean result = getDataStore().updateBroadcastFields(streamId, updatedBroadcast);

if (result) {
Expand All @@ -556,7 +557,6 @@ protected Result updateStreamSource(String streamId, BroadcastUpdate updatedBroa
//start streaming again if it was streaming and it's not Playlist
Broadcast fetchedBroadcast = getDataStore().get(streamId);
getApplication().startStreaming(fetchedBroadcast);

}
}

Expand All @@ -582,12 +582,12 @@ else if(getApplication().getStreamFetcherManager().isStreamRunning(broadcast)) {
}
}

public boolean waitStopStreaming(Broadcast broadcast, Boolean resultStopStreaming) {
public boolean waitStopStreaming(Broadcast broadcast) {

int i = 0;
int waitPeriod = 250;
// Broadcast status finished is not enough to be sure about broadcast's status.
while (!IAntMediaStreamHandler.BROADCAST_STATUS_FINISHED.equals(getDataStore().get(broadcast.getStreamId()).getStatus()) && !resultStopStreaming.equals(true)) {
while (!IAntMediaStreamHandler.BROADCAST_STATUS_FINISHED.equals(getDataStore().get(broadcast.getStreamId()).getStatus())) {
try {
i++;
logger.info("Waiting for stop broadcast: {} Total wait time: {}ms", broadcast.getStreamId() , i*waitPeriod);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/antmedia/streamsource/StreamFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ public void close(AVPacket pkt) {
}
else
{
logger.info("Stream fetcher will not try again for streamUrl:{} because stopRequestReceived:{} and restartStream:{}",
streamUrl, stopRequestReceived, restartStream);
logger.info("Stream fetcher will not try again for streamUrl:{} and streamId:{} because stopRequestReceived:{} and restartStream:{}",
streamUrl, streamId, stopRequestReceived, restartStream);

if (!closeCalled) {
getInstance().closeBroadcast(streamId);
Expand Down
41 changes: 33 additions & 8 deletions src/main/java/io/antmedia/streamsource/StreamFetcherManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,18 @@ public boolean isStreamRunning(Broadcast broadcast) {
isStreamLive = true;
}

if (!isStreamLive) {
if (!isStreamLive)
{
//this stream may be fetching in somewhere in the cluster
isStreamLive = AntMediaApplicationAdapter.isStreaming(broadcast.getStatus()) &&
AntMediaApplicationAdapter.isInstanceAlive(broadcast.getOriginAdress(), serverSettings.getHostAddress(), serverSettings.getDefaultHttpPort(), scope.getName());

boolean isStatusStreaming = AntMediaApplicationAdapter.isStreaming(broadcast.getStatus());
boolean isInstanceRunning = false;
if (isStatusStreaming) {
isInstanceRunning = AntMediaApplicationAdapter.isInstanceAlive(broadcast.getOriginAdress(), serverSettings.getHostAddress(), serverSettings.getDefaultHttpPort(), scope.getName());
}
isStreamLive = isStatusStreaming && isInstanceRunning;

logger.info("Stream is live:{}, originInstanceRunning:{} and streamId:{}", isStreamLive, isInstanceRunning, broadcast.getStreamId());
}

return isStreamLive;
Expand Down Expand Up @@ -493,12 +501,17 @@ private void scheduleStreamFetcherJob() {

public boolean isToBeStoppedAutomatically(Broadcast broadcast)
{

boolean timeout = broadcast.getStartTime() != 0 && (System.currentTimeMillis() > (broadcast.getStartTime() + streamCheckerIntervalMs));

// broadcast autoStartEnabled and there is nobody watching and it's started more than streamCheckerIntervalMs ago
logger.info("broadcast is autoStartStopEnabled:{} isAnyonewatching:{} startTime:{} streamCheckerIntervalMs:{}",
broadcast.isAutoStartStopEnabled(), broadcast.isAnyoneWatching(), broadcast.getStartTime(), streamCheckerIntervalMs);
boolean isTobeStopped = broadcast.isAutoStartStopEnabled() && !broadcast.isAnyoneWatching() && timeout;

return broadcast.isAutoStartStopEnabled() && !broadcast.isAnyoneWatching() &&
broadcast.getStartTime() != 0 && (System.currentTimeMillis() > (broadcast.getStartTime() + streamCheckerIntervalMs));
logger.info("Stream:{} isToBeStoppedAutomatically decision is {} - details autoStartStopEnabled:{} isAnyonewatching:{} timeout:{} streamCheckerIntervalMs:{}",
broadcast.getStreamId(), isTobeStopped, broadcast.isAutoStartStopEnabled(), broadcast.isAnyoneWatching(), timeout, streamCheckerIntervalMs);

return isTobeStopped;

}

public void controlStreamFetchers(boolean restart) {
Expand Down Expand Up @@ -535,8 +548,19 @@ public void controlStreamFetchers(boolean restart) {
logger.info("Stream:{} is alive -> {}, is it blocked -> {}", streamScheduler.getStreamId(), streamScheduler.isStreamAlive(), streamScheduler.isStreamBlocked());
//stream blocked means there is a connection to stream source and it's waiting to read a new packet
//Most of the time the problem is related to the stream source side.

if (!streamScheduler.isStreamBlocked() && !streamScheduler.isStreamAlive() &&
broadcast != null && AntMediaApplicationAdapter.BROADCAST_STATUS_TERMINATED_UNEXPECTEDLY.equals(broadcast.getStatus())) {
// if it's not blocked and it's not alive, stop the stream
logger.info("Stopping the stream because it is not getting updated(aka terminated_unexpectedly) and it will start for the streamId:{}", streamScheduler.getStreamId());
stopStreaming(streamScheduler.getStreamId());
//turn restart to true because we restart the stream to reconnect
restart = true;
}
}



//start streaming if broadcast object is in db(it means not deleted)
if (restart && broadcast != null)
{
Expand All @@ -553,7 +577,8 @@ public void controlStreamFetchers(boolean restart) {
}
});
}
else {
else
{
startStreaming(broadcast);
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/antmedia/websocket/WebSocketConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -652,4 +652,14 @@ private WebSocketConstants() {
* For example in conference case a participant use same websocket to publish its stream and to play the others
*/
public static final String USER_PUBLISH_ID = "userPublishId";

/**
* Notification to notify a new subtrack addition to a main track
*/
public static final String SUBTRACK_ADDED = "subtrackAdded";

/**
* Notification to notify a new subtrack removal to a main track
*/
public static final String SUBTRACK_REMOVED = "subtrackRemoved";
}
Loading

0 comments on commit 096bff8

Please sign in to comment.