[IE][VPU][XLink]: Port changes from releases/2021/4 to master (#8404)

This commit is contained in:
Daria Mityagina 2021-11-23 12:51:46 +03:00 committed by GitHub
parent 6feb981578
commit 277ff96564
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 527 additions and 81 deletions

View File

@ -6,19 +6,18 @@ include_guard(GLOBAL)
set(VPU_SUPPORTED_FIRMWARES usb-ma2x8x pcie-ma2x8x)
set(VPU_SUPPORTED_FIRMWARES_HASH
"bd0a40b82b1e024f99a175c0c967a61647d790a42a546b3f0ce8562107dc13dc"
"74efa0bb416ead2238878862aeca2f80d91268efb4859e09594536ef15908d0e")
"e65fcc1c6b0f3e9d814e53022c212ec0a2b83197a9df38badb298fb85ccf3acf"
"b11368fec2036d96fb703d2a40b171184fefe89f27e74a988ef1ca34260a2bc5")
#
# Default packages
#
set(FIRMWARE_PACKAGE_VERSION 1774)
set(FIRMWARE_PACKAGE_VERSION 1875)
set(VPU_CLC_MA2X8X_VERSION "movi-cltools-20.09.2")
#
# CMake variables to override default firmware files
#
list(LENGTH VPU_SUPPORTED_FIRMWARES num_firmwares)
math(EXPR num_firmwares "${num_firmwares} - 1")
foreach(idx RANGE 0 ${num_firmwares})

View File

@ -243,6 +243,9 @@ int pthread_sem_timedwait(pthread_sem_t *psem, const struct timespec *abstime) {
return pthread_sem_timed_or_blocked_wait(psem, abstime);
}
int pthread_sem_trywait(pthread_sem_t *psem) {
return pthread_sem_timed_or_blocked_wait(psem, NULL);
}
# ifdef __APPLE__
@ -258,6 +261,9 @@ int sem_post(sem_t *psem) {
int sem_wait(sem_t *psem) {
return pthread_sem_wait(psem);
}
int sem_trywait(sem_t *psem) {
return pthread_sem_trywait(psem);
}
int sem_timedwait(sem_t *psem, const struct timespec *abstime) {
return pthread_sem_timedwait(psem, abstime);
}

View File

@ -17,6 +17,7 @@ int pthread_sem_destroy(pthread_sem_t *psem);
int pthread_sem_post(pthread_sem_t *psem);
int pthread_sem_post_broadcast(pthread_sem_t *psem);
int pthread_sem_wait(pthread_sem_t *psem);
int pthread_sem_trywait(pthread_sem_t *psem);
int pthread_sem_timedwait(pthread_sem_t *psem, const struct timespec *abstime);
# ifdef __cplusplus
}
@ -36,6 +37,7 @@ int sem_init(sem_t *psem, int pshared, unsigned int value);
int sem_destroy(sem_t *psem);
int sem_post(sem_t *psem);
int sem_wait(sem_t *psem);
int sem_trywait(sem_t *psem);
int sem_timedwait(sem_t *psem, const struct timespec *abstime);
# ifdef __cplusplus

View File

@ -50,6 +50,7 @@ typedef struct sem_t_ * sem_t;
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *ts);
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_destroy(sem_t *sem);

View File

@ -85,6 +85,19 @@ int sem_timedwait(sem_t *sem, const struct timespec *ts) {
}
//Wait for a semaphore
int sem_trywait(sem_t *sem){
if (sem == NULL || *sem == NULL) {
return ls_set_errno(EINVAL);
}
sem_t s = *sem;
if (WaitForSingleObject(s->handle, 0) != WAIT_OBJECT_0) {
return ls_set_errno(EINVAL);
}
return 0;
}
//Release a semaphone
int sem_post(sem_t *sem){
if (sem == NULL || *sem == NULL){

View File

@ -163,6 +163,14 @@ XLinkError_t XLinkWriteData(streamId_t streamId, const uint8_t* buffer, int size
*/
XLinkError_t XLinkReadData(streamId_t streamId, streamPacketDesc_t** packet);
/**
* @brief Releases specific data from stream
* @param[in] streamId stream link Id obtained from XLinkOpenStream call
* @param[in] packetId ID of the package to be released from the stream
* @return Status code of the operation: X_LINK_SUCCESS (0) for success
*/
XLinkError_t XLinkReleaseSpecificData(streamId_t streamId, streamPacketDesc_t* packetDesc);
/**
* @brief Releases data from stream - This should be called after the data obtained from
* XlinkReadData is processed
@ -180,6 +188,33 @@ XLinkError_t XLinkReleaseData(streamId_t streamId);
*/
XLinkError_t XLinkGetFillLevel(streamId_t streamId, int isRemote, int* fillLevel);
/**
* @brief Reads data from local stream with timeout in ms. Will only have something if it was written to by the remote.
* Limitations.
* If we reached timeout and the event waiter returned timeout error code
* there potentially will be a time frame when XLink still has not marked the read
* event as completed with timeout state but the event receiver has just received data
* from another end and match that data with the read event. In this case we can lose
* data. Should be fixed for the next release.
* @param[in] streamId stream link Id obtained from XLinkOpenStream call
* @param[out] packet structure containing output data buffer and received size
* @param[in] timeoutMs timeout for a read operation in milliseconds
* @return Status code of the operation: X_LINK_SUCCESS (0) for success
*/
XLinkError_t XLinkReadDataWithTimeout(streamId_t streamId, streamPacketDesc_t** packet, unsigned int timeoutMs);
/**
* @brief Sends a package to initiate the writing of data to a remote stream with timeout in ms
* XLinkWriteDataWithTimeout is not fully supported yet. The XLinkWriteData method is called instead.
* @warning Actual size of the written data is ALIGN_UP(size, 64)
* @param[in] streamId stream link Id obtained from XLinkOpenStream call
* @param[in] buffer data buffer to be transmitted
* @param[in] size size of the data to be transmitted
* @param[in] timeoutMs timeout for a write operation in milliseconds
* @return Status code of the operation: X_LINK_SUCCESS (0) for success
*/
XLinkError_t XLinkWriteDataWithTimeout(streamId_t streamId, const uint8_t* buffer, int size, unsigned int timeoutMs);
// ------------------------------------
// Device streams management. End.
// ------------------------------------
@ -201,11 +236,8 @@ XLinkError_t XLinkDisconnect(linkId_t id);
XLinkError_t XLinkGetAvailableStreams(linkId_t id);
XLinkError_t XLinkWriteDataWithTimeout(streamId_t streamId, const uint8_t* buffer, int size, unsigned int timeout);
XLinkError_t XLinkAsyncWriteData();
XLinkError_t XLinkReadDataWithTimeOut(streamId_t streamId, streamPacketDesc_t** packet, unsigned int timeout);
XLinkError_t XLinkSetDeviceOpenTimeOutMsec(unsigned int msec);
XLinkError_t XLinkSetCommonTimeOutMsec(unsigned int msec);

View File

@ -32,13 +32,17 @@ XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle);
int DispatcherClean(xLinkDeviceHandle_t *deviceHandle);
xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event);
int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle);
int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle, unsigned int timeoutMs);
char* TypeToStr(int type);
int DispatcherUnblockEvent(eventId_t id,
xLinkEventType_t type,
streamId_t stream,
void *xlinkFD);
xLinkEventType_t type,
streamId_t stream,
void *xlinkFD);
int DispatcherServeEvent(eventId_t id,
xLinkEventType_t type,
streamId_t stream,
void *xlinkFD);
#ifdef __cplusplus
}
#endif

View File

@ -19,6 +19,14 @@
x = base; \
}
// Avoid problems with unsigned, first compare and then give the new value
#define CIRCULAR_DECREMENT(x, maxVal) \
do { \
if ((x) == 0) \
(x) = (maxVal); \
else \
(x)--; \
} while(0)
#define COUNT_OF(x) ((sizeof(x)/sizeof(0[x])) / ((!(sizeof(x) % sizeof(0[x])))))
#ifndef MIN

View File

@ -16,8 +16,11 @@
extern "C"
{
#endif
#ifdef XLINK_MAX_STREAM_RES
#define MAX_POOLS_ALLOC XLINK_MAX_STREAM_RES
#else
#define MAX_POOLS_ALLOC 32
#endif
#define PACKET_LENGTH (64*1024)
typedef enum {

View File

@ -25,7 +25,11 @@ extern "C"
{
#endif
#ifdef XLINK_MAX_STREAM_RES
#define MAXIMUM_SEMAPHORES XLINK_MAX_STREAM_RES
#else
#define MAXIMUM_SEMAPHORES 32
#endif
#define __CACHE_LINE_SIZE 64
typedef int32_t eventId_t;
@ -75,6 +79,7 @@ typedef enum
XLINK_WRITE_REQ,
XLINK_READ_REQ,
XLINK_READ_REL_REQ,
XLINK_READ_REL_SPEC_REQ,
XLINK_CREATE_STREAM_REQ,
XLINK_CLOSE_STREAM_REQ,
XLINK_PING_REQ,
@ -84,6 +89,7 @@ typedef enum
XLINK_WRITE_RESP,
XLINK_READ_RESP,
XLINK_READ_REL_RESP,
XLINK_READ_REL_SPEC_RESP,
XLINK_CREATE_STREAM_RESP,
XLINK_CLOSE_STREAM_RESP,
XLINK_PING_RESP,

View File

@ -16,8 +16,13 @@ extern "C"
#endif
#define XLINK_MAX_NAME_SIZE 28
#ifdef XLINK_MAX_STREAM_RES
#define XLINK_MAX_STREAMS XLINK_MAX_STREAM_RES
#else
#define XLINK_MAX_STREAMS 32
#endif
#define XLINK_MAX_PACKETS_PER_STREAM 64
#define XLINK_NO_RW_TIMEOUT 0xFFFFFFFF
typedef enum{
X_LINK_SUCCESS = 0,

View File

@ -17,6 +17,7 @@
# include "win_synchapi.h"
# else
# include <pthread.h>
# include <unistd.h>
# ifdef __APPLE__
# include "pthread_semaphore.h"
# else
@ -58,6 +59,7 @@ int XLink_sem_destroy(XLink_sem_t* sem);
int XLink_sem_post(XLink_sem_t* sem);
int XLink_sem_wait(XLink_sem_t* sem);
int XLink_sem_timedwait(XLink_sem_t* sem, const struct timespec* abstime);
int XLink_sem_trywait(XLink_sem_t* sem);
//
// Helper functions for XLink semaphore wrappers.

View File

@ -33,8 +33,8 @@ static XLinkError_t checkEventHeader(xLinkEventHeader_t header);
#endif
static float timespec_diff(struct timespec *start, struct timespec *stop);
static XLinkError_t addEvent(xLinkEvent_t *event);
static XLinkError_t addEventWithPerf(xLinkEvent_t *event, float* opTime);
static XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs);
static XLinkError_t addEventWithPerf(xLinkEvent_t *event, float* opTime, unsigned int timeoutMs);
static XLinkError_t getLinkByStreamId(streamId_t streamId, xLinkDesc_t** out_link);
// ------------------------------------
@ -64,7 +64,7 @@ streamId_t XLinkOpenStream(linkId_t id, const char* name, int stream_write_size)
DispatcherAddEvent(EVENT_LOCAL, &event);
XLINK_RET_ERR_IF(
DispatcherWaitEventComplete(&link->deviceHandle),
DispatcherWaitEventComplete(&link->deviceHandle, XLINK_NO_RW_TIMEOUT),
INVALID_STREAM_ID);
#ifdef __PC__
@ -112,7 +112,7 @@ XLinkError_t XLinkCloseStream(streamId_t streamId)
XLINK_INIT_EVENT(event, streamId, XLINK_CLOSE_STREAM_REQ,
0, NULL, link->deviceHandle);
XLINK_RET_IF(addEvent(&event));
XLINK_RET_IF(addEvent(&event, XLINK_NO_RW_TIMEOUT));
return X_LINK_SUCCESS;
}
@ -121,7 +121,7 @@ XLinkError_t XLinkWriteData(streamId_t streamId, const uint8_t* buffer,
{
XLINK_RET_IF(buffer == NULL);
float opTime = 0;
float opTime = 0.0f;
xLinkDesc_t* link = NULL;
XLINK_RET_IF(getLinkByStreamId(streamId, &link));
streamId = EXTRACT_STREAM_ID(streamId);
@ -130,7 +130,7 @@ XLinkError_t XLinkWriteData(streamId_t streamId, const uint8_t* buffer,
XLINK_INIT_EVENT(event, streamId, XLINK_WRITE_REQ,
size,(void*)buffer, link->deviceHandle);
XLINK_RET_IF(addEventWithPerf(&event, &opTime));
XLINK_RET_IF(addEventWithPerf(&event, &opTime, XLINK_NO_RW_TIMEOUT));
if( glHandler->profEnable) {
glHandler->profilingData.totalWriteBytes += size;
@ -144,7 +144,7 @@ XLinkError_t XLinkReadData(streamId_t streamId, streamPacketDesc_t** packet)
{
XLINK_RET_IF(packet == NULL);
float opTime = 0;
float opTime = 0.0f;
xLinkDesc_t* link = NULL;
XLINK_RET_IF(getLinkByStreamId(streamId, &link));
streamId = EXTRACT_STREAM_ID(streamId);
@ -153,7 +153,60 @@ XLinkError_t XLinkReadData(streamId_t streamId, streamPacketDesc_t** packet)
XLINK_INIT_EVENT(event, streamId, XLINK_READ_REQ,
0, NULL, link->deviceHandle);
XLINK_RET_IF(addEventWithPerf(&event, &opTime));
XLINK_RET_IF(addEventWithPerf(&event, &opTime, XLINK_NO_RW_TIMEOUT));
*packet = (streamPacketDesc_t *)event.data;
if(*packet == NULL) {
return X_LINK_ERROR;
}
if( glHandler->profEnable) {
glHandler->profilingData.totalReadBytes += (*packet)->length;
glHandler->profilingData.totalReadTime += opTime;
}
return X_LINK_SUCCESS;
}
XLinkError_t XLinkWriteDataWithTimeout(streamId_t streamId, const uint8_t* buffer,
int size, unsigned int timeoutMs)
{
XLINK_RET_IF(buffer == NULL);
float opTime = 0.0f;
xLinkDesc_t* link = NULL;
XLINK_RET_IF(getLinkByStreamId(streamId, &link));
streamId = EXTRACT_STREAM_ID(streamId);
xLinkEvent_t event = {0};
XLINK_INIT_EVENT(event, streamId, XLINK_WRITE_REQ,
size,(void*)buffer, link->deviceHandle);
mvLog(MVLOG_WARN,"XLinkWriteDataWithTimeout is not fully supported yet. The XLinkWriteData method is called instead. Desired timeout = %d\n", timeoutMs);
XLINK_RET_IF_FAIL(addEventWithPerf(&event, &opTime, XLINK_NO_RW_TIMEOUT));
if( glHandler->profEnable) {
glHandler->profilingData.totalWriteBytes += size;
glHandler->profilingData.totalWriteTime += opTime;
}
return X_LINK_SUCCESS;
}
XLinkError_t XLinkReadDataWithTimeout(streamId_t streamId, streamPacketDesc_t** packet, unsigned int timeoutMs)
{
XLINK_RET_IF(packet == NULL);
float opTime = 0.0f;
xLinkDesc_t* link = NULL;
XLINK_RET_IF(getLinkByStreamId(streamId, &link));
streamId = EXTRACT_STREAM_ID(streamId);
xLinkEvent_t event = {0};
XLINK_INIT_EVENT(event, streamId, XLINK_READ_REQ,
0, NULL, link->deviceHandle);
XLINK_RET_IF_FAIL(addEventWithPerf(&event, &opTime, timeoutMs));
*packet = (streamPacketDesc_t *)event.data;
if(*packet == NULL) {
@ -178,7 +231,22 @@ XLinkError_t XLinkReleaseData(streamId_t streamId)
XLINK_INIT_EVENT(event, streamId, XLINK_READ_REL_REQ,
0, NULL, link->deviceHandle);
XLINK_RET_IF(addEvent(&event));
XLINK_RET_IF(addEvent(&event, XLINK_NO_RW_TIMEOUT));
return X_LINK_SUCCESS;
}
XLinkError_t XLinkReleaseSpecificData(streamId_t streamId, streamPacketDesc_t* packetDesc)
{
xLinkDesc_t* link = NULL;
XLINK_RET_IF(getLinkByStreamId(streamId, &link));
streamId = EXTRACT_STREAM_ID(streamId);
xLinkEvent_t event = {0};
XLINK_INIT_EVENT(event, streamId, XLINK_READ_REL_SPEC_REQ,
0, (void*)packetDesc->data, link->deviceHandle);
XLINK_RET_IF(addEvent(&event, XLINK_NO_RW_TIMEOUT));
return X_LINK_SUCCESS;
}
@ -244,7 +312,7 @@ float timespec_diff(struct timespec *start, struct timespec *stop)
return start->tv_nsec/ 1000000000.0f + start->tv_sec;
}
XLinkError_t addEvent(xLinkEvent_t *event)
XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs)
{
ASSERT_XLINK(event);
@ -255,8 +323,37 @@ XLinkError_t addEvent(xLinkEvent_t *event)
return X_LINK_ERROR;
}
if (DispatcherWaitEventComplete(&event->deviceHandle)) {
return X_LINK_TIMEOUT;
if (timeoutMs != XLINK_NO_RW_TIMEOUT) {
ASSERT_XLINK(event->header.type == XLINK_READ_REQ);
xLinkDesc_t* link;
getLinkByStreamId(event->header.streamId, &link);
if (DispatcherWaitEventComplete(&event->deviceHandle, timeoutMs)) // timeout reached
{
streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD,
event->header.streamId);
ASSERT_XLINK(stream);
if (event->header.type == XLINK_READ_REQ)
{
// XLINK_READ_REQ is a local event. It is safe to serve it.
// Limitations.
// Possible vulnerability in this mechanism:
// If we reach timeout with DispatcherWaitEventComplete and before
// we call DispatcherServeEvent, the event actually comes,
// and gets served by XLink stack and event semaphore is posted.
DispatcherServeEvent(event->header.id, XLINK_READ_REQ, stream->id, event->deviceHandle.xLinkFD);
}
releaseStream(stream);
return X_LINK_TIMEOUT;
}
}
else // No timeout
{
if (DispatcherWaitEventComplete(&event->deviceHandle, timeoutMs))
{
return X_LINK_TIMEOUT;
}
}
XLINK_RET_ERR_IF(
@ -266,14 +363,14 @@ XLinkError_t addEvent(xLinkEvent_t *event)
return X_LINK_SUCCESS;
}
XLinkError_t addEventWithPerf(xLinkEvent_t *event, float* opTime)
XLinkError_t addEventWithPerf(xLinkEvent_t *event, float* opTime, unsigned int timeoutMs)
{
ASSERT_XLINK(opTime);
struct timespec start, end;
clock_gettime(CLOCK_REALTIME, &start);
XLINK_RET_IF_FAIL(addEvent(event));
XLINK_RET_IF_FAIL(addEvent(event, timeoutMs));
clock_gettime(CLOCK_REALTIME, &end);
*opTime = timespec_diff(&start, &end);

View File

@ -100,19 +100,6 @@ XLinkError_t XLinkGetAvailableStreams(linkId_t id)
return X_LINK_NOT_IMPLEMENTED;
}
XLinkError_t XLinkWriteDataWithTimeout(streamId_t streamId, const uint8_t* buffer,
int size, unsigned int timeout)
{
(void)timeout;
return XLinkWriteData(streamId, buffer, size);
}
XLinkError_t XLinkReadDataWithTimeOut(streamId_t streamId, streamPacketDesc_t** packet, unsigned int timeout)
{
(void)timeout;
return XLinkReadData(streamId, packet);
}
XLinkError_t XLinkAsyncWriteData()
{
return X_LINK_NOT_IMPLEMENTED;

View File

@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
//
#include <errno.h>
#include "stdio.h"
#include "stdint.h"
#include "string.h"
@ -130,7 +131,9 @@ XLinkError_t XLinkInitialize(XLinkGlobalHandler_t* globalHandler)
temp.protocol = X_LINK_ANY_PROTOCOL;
XLINK_RET_IF_FAIL(DispatcherStart(&temp)); //myriad has one
sem_wait(&pingSem);
while(((sem_wait(&pingSem) == -1) && errno == EINTR)
continue;
#endif
return X_LINK_SUCCESS;
@ -204,7 +207,7 @@ XLinkError_t XLinkConnect(XLinkHandler_t* handler)
event.deviceHandle = link->deviceHandle;
DispatcherAddEvent(EVENT_LOCAL, &event);
if (DispatcherWaitEventComplete(&link->deviceHandle)) {
if (DispatcherWaitEventComplete(&link->deviceHandle, XLINK_NO_RW_TIMEOUT)) {
DispatcherClean(&link->deviceHandle);
return X_LINK_TIMEOUT;
}
@ -250,10 +253,13 @@ XLinkError_t XLinkResetRemote(linkId_t id)
event.deviceHandle = link->deviceHandle;
mvLog(MVLOG_DEBUG, "sending reset remote event\n");
DispatcherAddEvent(EVENT_LOCAL, &event);
XLINK_RET_ERR_IF(DispatcherWaitEventComplete(&link->deviceHandle),
XLINK_RET_ERR_IF(DispatcherWaitEventComplete(&link->deviceHandle, XLINK_NO_RW_TIMEOUT),
X_LINK_TIMEOUT);
if(XLink_sem_wait(&link->dispatcherClosedSem)) {
int rc;
while(((rc = XLink_sem_wait(&link->dispatcherClosedSem)) == -1) && errno == EINTR)
continue;
if(rc) {
mvLog(MVLOG_ERROR,"can't wait dispatcherClosedSem\n");
return X_LINK_ERROR;
}

View File

@ -11,6 +11,7 @@
#define _GNU_SOURCE // fix for warning: implicit declaration of function pthread_setname_np
#endif
#include <errno.h>
#include "stdio.h"
#include "stdint.h"
#include "stdlib.h"
@ -18,6 +19,17 @@
#include <assert.h>
#include <stdlib.h>
#if (defined(_WIN32) || defined(_WIN64))
# include "win_pthread.h"
# include "win_semaphore.h"
#else
# include <pthread.h>
# include <unistd.h>
# ifndef __APPLE__
# include <semaphore.h>
# endif
#endif
#include "XLinkDispatcher.h"
#include "XLinkMacros.h"
#include "XLinkPrivateDefines.h"
@ -72,6 +84,8 @@ typedef struct {
int queueProcPriority;
pthread_mutex_t queueMutex;
XLink_sem_t addEventSem;
XLink_sem_t notifyDispatcherSem;
volatile uint32_t resetXLink;
@ -101,6 +115,7 @@ xLinkSchedulerState_t schedulerState[MAX_SCHEDULERS];
sem_t addSchedulerSem;
static pthread_mutex_t clean_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t num_schedulers_mutex = PTHREAD_MUTEX_INITIALIZER;
// ------------------------------------
// Global fields declaration. End.
@ -237,6 +252,10 @@ XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle)
perror("Can't create semaphore\n");
return -1;
}
if (pthread_mutex_init(&(schedulerState[idx].queueMutex), NULL) != 0) {
perror("pthread_mutex_init error");
return -1;
}
if (XLink_sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
perror("Can't create semaphore\n");
}
@ -259,9 +278,22 @@ XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle)
mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
pthread_attr_destroy(&attr);
}
#ifdef XLINK_THREADS_PRIORITY
struct sched_param param;
if (pthread_attr_getschedparam(&attr, &param) != 0) {
mvLog(MVLOG_ERROR,"pthread_attr_getschedparam error");
pthread_attr_destroy(&attr);
}
param.sched_priority = XLINK_THREADS_PRIORITY;
if (pthread_attr_setschedparam(&attr, &param) != 0) {
mvLog(MVLOG_ERROR,"pthread_attr_setschedparam error");
pthread_attr_destroy(&attr);
}
#endif
#endif
sem_wait(&addSchedulerSem);
while(((sem_wait(&addSchedulerSem) == -1) && errno == EINTR))
continue;
mvLog(MVLOG_DEBUG,"%s() starting a new thread - schedulerId %d \n", __func__, idx);
int sc = pthread_create(&schedulerState[idx].xLinkThreadId,
&attr,
@ -314,7 +346,10 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
return NULL;
}
mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
if (XLink_sem_wait(&curr->addEventSem)) {
int rc;
while(((rc = XLink_sem_wait(&curr->addEventSem)) == -1) && errno == EINTR)
continue;
if (rc) {
mvLog(MVLOG_ERROR,"can't wait semaphore\n");
return NULL;
}
@ -349,7 +384,7 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
return ev;
}
int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle, unsigned int timeoutMs)
{
xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
ASSERT_XLINK(curr != NULL);
@ -359,19 +394,41 @@ int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
return -1;
}
int rc = XLink_sem_wait(id);
int rc = 0;
if (timeoutMs != XLINK_NO_RW_TIMEOUT) {
// This is a workaround for sem_timedwait being influenced by the system clock change.
// This is a temporary solution. TODO: replace this with something more efficient.
while (timeoutMs--) {
rc = XLink_sem_trywait(id);
if (!rc) {
break;
} else {
#if (defined(_WIN32) || defined(_WIN64) )
Sleep(1);
#else
usleep(1000);
#endif
}
}
} else {
while(((rc = XLink_sem_wait(id)) == -1) && errno == EINTR)
continue;
}
#ifdef __PC__
if (rc) {
xLinkEvent_t event = {0};
event.header.type = XLINK_RESET_REQ;
event.deviceHandle = *deviceHandle;
mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
DispatcherAddEvent(EVENT_LOCAL, &event);
id = getSem(pthread_self(), curr);
if (id == NULL || XLink_sem_wait(id)) {
dispatcherReset(curr);
xLinkEvent_t event = {0};
event.header.type = XLINK_RESET_REQ;
event.deviceHandle = *deviceHandle;
mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
DispatcherAddEvent(EVENT_LOCAL, &event);
id = getSem(pthread_self(), curr);
int rc;
while(((rc = XLink_sem_wait(id)) == -1) && errno == EINTR)
continue;
if (id == NULL || rc) {
dispatcherReset(curr);
}
}
}
#endif
return rc;
@ -384,6 +441,7 @@ char* TypeToStr(int type)
case XLINK_WRITE_REQ: return "XLINK_WRITE_REQ";
case XLINK_READ_REQ: return "XLINK_READ_REQ";
case XLINK_READ_REL_REQ: return "XLINK_READ_REL_REQ";
case XLINK_READ_REL_SPEC_REQ: return "XLINK_READ_REL_SPEC_REQ";
case XLINK_CREATE_STREAM_REQ:return "XLINK_CREATE_STREAM_REQ";
case XLINK_CLOSE_STREAM_REQ: return "XLINK_CLOSE_STREAM_REQ";
case XLINK_PING_REQ: return "XLINK_PING_REQ";
@ -392,6 +450,7 @@ char* TypeToStr(int type)
case XLINK_WRITE_RESP: return "XLINK_WRITE_RESP";
case XLINK_READ_RESP: return "XLINK_READ_RESP";
case XLINK_READ_REL_RESP: return "XLINK_READ_REL_RESP";
case XLINK_READ_REL_SPEC_RESP: return "XLINK_READ_REL_SPEC_RESP";
case XLINK_CREATE_STREAM_RESP: return "XLINK_CREATE_STREAM_RESP";
case XLINK_CLOSE_STREAM_RESP: return "XLINK_CLOSE_STREAM_RESP";
case XLINK_PING_RESP: return "XLINK_PING_RESP";
@ -410,6 +469,8 @@ int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t strea
mvLog(MVLOG_DEBUG,"unblock\n");
xLinkEventPriv_t* blockedEvent;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, 1);
for (blockedEvent = curr->lQueue.q;
blockedEvent < curr->lQueue.q + MAX_EVENTS;
blockedEvent++)
@ -426,6 +487,7 @@ int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t strea
if (XLink_sem_post(&curr->notifyDispatcherSem)){
mvLog(MVLOG_ERROR, "can't post semaphore\n");
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 1;
} else {
mvLog(MVLOG_DEBUG,"%d %s\n",
@ -433,6 +495,34 @@ int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t strea
TypeToStr((int)blockedEvent->packet.header.type));
}
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 0;
}
int DispatcherServeEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
{
xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
ASSERT_XLINK(curr != NULL);
xLinkEventPriv_t* event;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, 1);
for (event = curr->lQueue.q;
event < curr->lQueue.q + MAX_EVENTS;
event++)
{
if (((event->packet.header.id == id || id == -1)
&& event->packet.header.type == type
&& event->packet.header.streamId == stream))
{
mvLog(MVLOG_DEBUG,"served**************** %d %s\n",
(int)event->packet.header.id,
TypeToStr((int)event->packet.header.type));
event->isServed = EVENT_SERVED;
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 1;
}
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 0;
}
@ -546,8 +636,10 @@ static void* eventReader(void* ctx)
if (sc) {
mvLog(MVLOG_DEBUG,"Failed to receive event (err %d)", sc);
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, NULL);
dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, NULL);
continue;
}
@ -581,6 +673,7 @@ static void* eventSchedulerRun(void* ctx)
mvLog(MVLOG_ERROR,"pthread_attr_init error");
return NULL;
}
#ifndef __PC__
if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
pthread_attr_destroy(&attr);
@ -592,6 +685,18 @@ static void* eventSchedulerRun(void* ctx)
mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
return NULL;
}
#ifdef XLINK_THREADS_PRIORITY
struct sched_param param;
if (pthread_attr_getschedparam(&attr, &param) != 0) {
mvLog(MVLOG_ERROR,"pthread_attr_getschedparam error");
pthread_attr_destroy(&attr);
}
param.sched_priority = XLINK_THREADS_PRIORITY;
if (pthread_attr_setschedparam(&attr, &param) != 0) {
mvLog(MVLOG_ERROR,"pthread_attr_setschedparam error");
pthread_attr_destroy(&attr);
}
#endif
#endif
sc = pthread_create(&readerThreadId, &attr, eventReader, curr);
if (sc) {
@ -679,17 +784,24 @@ int findAvailableScheduler()
static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD)
{
int i;
XLINK_RET_ERR_IF(pthread_mutex_lock(&num_schedulers_mutex) != 0, NULL);
if (xLinkFD == NULL) { //in case of myriad there should be one scheduler
if (numSchedulers == 1)
if (numSchedulers == 1) {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&num_schedulers_mutex) != 0, NULL);
return &schedulerState[0];
else
NULL;
} else {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&num_schedulers_mutex) != 0, NULL);
return NULL;
}
}
for (i=0; i < MAX_SCHEDULERS; i++)
if (schedulerState[i].schedulerId != -1 &&
schedulerState[i].deviceHandle.xLinkFD == xLinkFD)
return &schedulerState[i];
schedulerState[i].deviceHandle.xLinkFD == xLinkFD) {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&num_schedulers_mutex) != 0, NULL);
return &schedulerState[i];
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&num_schedulers_mutex) != 0, NULL);
return NULL;
}
@ -703,7 +815,7 @@ static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_
(header->flags.bitField.ack == 0
&& header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
postAndMarkEventServed(event);
}else if (header->flags.bitField.ack == 1
} else if (header->flags.bitField.ack == 1
&& header->flags.bitField.nack == 0){
event->isServed = EVENT_PENDING;
mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
@ -800,15 +912,16 @@ static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
eventQueueHandler_t *q, xLinkEvent_t* event,
XLink_sem_t* sem, xLinkEventOrigin_t o){
xLinkEvent_t* ev;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, NULL);
xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
if (eventP == NULL) {
mvLog(MVLOG_ERROR, "getNextElementWithState returned NULL");
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, NULL);
return NULL;
}
mvLog(MVLOG_DEBUG, "Received event %s %d", TypeToStr(event->header.type), o);
ev = &eventP->packet;
(void)curr;
eventP->sem = sem;
eventP->packet = *event;
eventP->origin = o;
@ -821,6 +934,7 @@ static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
q->cur = eventP;
eventP->isServed = EVENT_ALLOCATED;
CIRCULAR_INCREMENT_BASE(q->cur, q->end, q->base);
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, NULL);
return ev;
}
@ -828,13 +942,18 @@ static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
{
XLINK_RET_ERR_IF(curr == NULL, NULL);
if (XLink_sem_wait(&curr->notifyDispatcherSem)) {
int rc;
while(((rc = XLink_sem_wait(&curr->notifyDispatcherSem)) == -1) && errno == EINTR)
continue;
if (rc) {
mvLog(MVLOG_ERROR,"can't post semaphore\n");
}
xLinkEventPriv_t* event = NULL;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, NULL);
event = searchForReadyEvent(curr);
if (event) {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, NULL);
return event;
}
@ -844,10 +963,12 @@ static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
event = getNextQueueElemToProc(hPriorityQueue);
if (event) {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, NULL);
return event;
}
event = getNextQueueElemToProc(lPriorityQueue);
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, NULL);
return event;
}
@ -873,10 +994,14 @@ static int dispatcherClean(xLinkSchedulerState_t* curr)
mvLog(MVLOG_INFO, "dropped event is %s, status %d\n",
TypeToStr(event->packet.header.type), event->isServed);
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, 1);
postAndMarkEventServed(event);
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
event = dispatcherGetNextEvent(curr);
}
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, 1);
dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
@ -893,10 +1018,13 @@ static int dispatcherClean(xLinkSchedulerState_t* curr)
}
numSchedulers--;
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
mvLog(MVLOG_INFO, "Clean Dispatcher Successfully...");
if(pthread_mutex_unlock(&clean_mutex) != 0) {
mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex after clearing dispatcher");
}
XLINK_RET_ERR_IF(pthread_mutex_destroy(&(curr->queueMutex)) != 0, 1);
return 0;
}
@ -945,11 +1073,13 @@ static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
event->packet.header.flags.bitField.nack = 1;
event->packet.header.flags.bitField.ack = 0;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
if (event->origin == EVENT_LOCAL){
dispatcherRequestServe(event, curr);
} else {
dispatcherResponseServe(event, curr);
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
continue;
}
@ -965,8 +1095,9 @@ static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
}
res = getResp(&event->packet, &response.packet);
if (isEventTypeRequest(event)){
if (event->origin == EVENT_LOCAL){ //we need to do this for locals only
if (isEventTypeRequest(event)) {
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
if (event->origin == EVENT_LOCAL) { //we need to do this for locals only
if(dispatcherRequestServe(event, curr)) {
mvLog(MVLOG_ERROR, "Failed to serve local event. "
"Event: id=%d, type=%s, streamId=%u, streamName=%s",
@ -992,16 +1123,23 @@ static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
}
}
#endif // __PC__
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
if (glControlFunc->eventSend(toSend) != 0) {
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
mvLog(MVLOG_ERROR, "Event sending failed");
}
} else {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
}
} else {
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
if (event->origin == EVENT_REMOTE){ // match remote response with the local request
dispatcherResponseServe(event, curr);
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
}
if (event->origin == EVENT_REMOTE){

View File

@ -26,6 +26,7 @@ static int isStreamSpaceEnoughFor(streamDesc_t* stream, uint32_t size);
static streamPacketDesc_t* getPacketFromStream(streamDesc_t* stream);
static int releasePacketFromStream(streamDesc_t* stream, uint32_t* releasedSize);
static int releaseSpecificPacketFromStream(streamDesc_t* stream, uint32_t* releasedSize, uint8_t* data);
static int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size);
static int handleIncomingEvent(xLinkEvent_t* event);
@ -104,7 +105,6 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
switch (event->header.type){
case XLINK_WRITE_REQ:
{
//in case local tries to write after it issues close (writeSize is zero)
stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
@ -174,10 +174,32 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
releaseStream(stream);
break;
}
case XLINK_READ_REL_SPEC_REQ:
{
uint8_t* data = (uint8_t*)event->data;
stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
ASSERT_XLINK(stream);
XLINK_EVENT_ACKNOWLEDGE(event);
uint32_t releasedSize = 0;
releaseSpecificPacketFromStream(stream, &releasedSize, data);
event->header.size = releasedSize;
releaseStream(stream);
break;
}
case XLINK_CREATE_STREAM_REQ:
{
XLINK_EVENT_ACKNOWLEDGE(event);
mvLog(MVLOG_DEBUG,"XLINK_CREATE_STREAM_REQ - do nothing\n");
#ifdef __PC__
event->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
event->header.streamName,
event->header.size, 0,
INVALID_STREAM_ID);
mvLog(MVLOG_DEBUG, "XLINK_CREATE_STREAM_REQ - stream has been just opened with id %ld\n",
event->header.streamId);
#else
mvLog(MVLOG_DEBUG, "XLINK_CREATE_STREAM_REQ - do nothing. Stream will be "
"opened with forced id accordingly to response from the host\n");
#endif
break;
}
case XLINK_CLOSE_STREAM_REQ:
@ -212,6 +234,7 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
case XLINK_WRITE_RESP:
case XLINK_READ_RESP:
case XLINK_READ_REL_RESP:
case XLINK_READ_REL_SPEC_RESP:
case XLINK_CREATE_STREAM_RESP:
case XLINK_CLOSE_STREAM_RESP:
case XLINK_PING_RESP:
@ -250,19 +273,44 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
response->deviceHandle = event->deviceHandle;
XLINK_EVENT_ACKNOWLEDGE(response);
// we got some data. We should unblock a blocked read
int xxx = DispatcherUnblockEvent(-1,
XLINK_READ_REQ,
response->header.streamId,
event->deviceHandle.xLinkFD);
XLINK_READ_REQ,
response->header.streamId,
event->deviceHandle.xLinkFD);
(void) xxx;
mvLog(MVLOG_DEBUG,"unblocked from stream %d %d\n",
(int)response->header.streamId, (int)xxx);
(int)response->header.streamId, (int)xxx);
}
break;
case XLINK_READ_REQ:
break;
case XLINK_READ_REL_SPEC_REQ:
XLINK_EVENT_ACKNOWLEDGE(response);
response->header.type = XLINK_READ_REL_SPEC_RESP;
response->deviceHandle = event->deviceHandle;
stream = getStreamById(event->deviceHandle.xLinkFD,
event->header.streamId);
ASSERT_XLINK(stream);
stream->remoteFillLevel -= event->header.size;
stream->remoteFillPacketLevel--;
mvLog(MVLOG_DEBUG,"S%d: Got remote release of %ld, remote fill level %ld out of %ld %ld\n",
event->header.streamId, event->header.size, stream->remoteFillLevel, stream->writeSize, stream->readSize);
releaseStream(stream);
DispatcherUnblockEvent(-1, XLINK_WRITE_REQ, event->header.streamId,
event->deviceHandle.xLinkFD);
//with every released packet check if the stream is already marked for close
if (stream->closeStreamInitiated && stream->localFillLevel == 0)
{
mvLog(MVLOG_DEBUG,"%s() Unblock close STREAM\n", __func__);
DispatcherUnblockEvent(-1,
XLINK_CLOSE_STREAM_REQ,
event->header.streamId,
event->deviceHandle.xLinkFD);
}
break;
case XLINK_READ_REL_REQ:
XLINK_EVENT_ACKNOWLEDGE(response);
response->header.type = XLINK_READ_REL_RESP;
@ -294,11 +342,17 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
XLINK_EVENT_ACKNOWLEDGE(response);
response->header.type = XLINK_CREATE_STREAM_RESP;
//write size from remote means read size for this peer
#ifndef __PC__
response->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
event->header.streamName,
0, event->header.size,
event->header.streamId);
#else
response->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
event->header.streamName,
0, event->header.size,
INVALID_STREAM_ID);
#endif
if (response->header.streamId == INVALID_STREAM_ID) {
response->header.flags.bitField.ack = 0;
response->header.flags.bitField.sizeTooBig = 1;
@ -374,15 +428,22 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
break;
case XLINK_READ_REL_RESP:
break;
case XLINK_READ_REL_SPEC_RESP:
break;
case XLINK_CREATE_STREAM_RESP:
{
// write_size from the response the size of the buffer from the remote
#ifndef __PC__
response->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
event->header.streamName,
event->header.size, 0,
event->header.streamId);
XLINK_RET_IF(response->header.streamId
== INVALID_STREAM_ID);
mvLog(MVLOG_DEBUG, "XLINK_CREATE_STREAM_REQ - stream has been just opened "
"with forced id=%ld accordingly to response from the host\n",
response->header.streamId);
#endif
response->deviceHandle = event->deviceHandle;
break;
}
@ -521,6 +582,58 @@ int releasePacketFromStream(streamDesc_t* stream, uint32_t* releasedSize)
return 0;
}
int releaseSpecificPacketFromStream(streamDesc_t* stream, uint32_t* releasedSize, uint8_t* data) {
if (stream->blockedPackets == 0) {
mvLog(MVLOG_ERROR,"There is no packet to release\n");
return 0; // ignore this, although this is a big problem on application side
}
uint32_t packetId = stream->firstPacket;
uint32_t found = 0;
do {
if (stream->packets[packetId].data == data) {
found = 1;
break;
}
CIRCULAR_INCREMENT(packetId, XLINK_MAX_PACKETS_PER_STREAM);
} while (packetId != stream->firstPacketUnused);
ASSERT_XLINK(found);
streamPacketDesc_t* currPack = &stream->packets[packetId];
if (currPack->length == 0) {
mvLog(MVLOG_ERROR, "Packet with ID %d is empty\n", packetId);
}
stream->localFillLevel -= currPack->length;
mvLog(MVLOG_DEBUG, "S%d: Got release of %ld , current local fill level is %ld out of %ld %ld\n",
stream->id, currPack->length, stream->localFillLevel, stream->readSize, stream->writeSize);
XLinkPlatformDeallocateData(currPack->data,
ALIGN_UP_INT32((int32_t) currPack->length, __CACHE_LINE_SIZE), __CACHE_LINE_SIZE);
stream->blockedPackets--;
if (releasedSize) {
*releasedSize = currPack->length;
}
if (packetId != stream->firstPacket) {
uint32_t currIndex = packetId;
uint32_t nextIndex = currIndex;
CIRCULAR_INCREMENT(nextIndex, XLINK_MAX_PACKETS_PER_STREAM);
while (currIndex != stream->firstPacketFree) {
stream->packets[currIndex] = stream->packets[nextIndex];
currIndex = nextIndex;
CIRCULAR_INCREMENT(nextIndex, XLINK_MAX_PACKETS_PER_STREAM);
}
CIRCULAR_DECREMENT(stream->firstPacketUnused, (XLINK_MAX_PACKETS_PER_STREAM - 1));
CIRCULAR_DECREMENT(stream->firstPacketFree, (XLINK_MAX_PACKETS_PER_STREAM - 1));
} else {
CIRCULAR_INCREMENT(stream->firstPacket, XLINK_MAX_PACKETS_PER_STREAM);
}
return 0;
}
int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size) {
if (stream->availablePackets + stream->blockedPackets < XLINK_MAX_PACKETS_PER_STREAM)
{

View File

@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
//
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include "stdlib.h"
@ -55,8 +56,13 @@ streamDesc_t* getStreamById(void* fd, streamId_t id)
int stream;
for (stream = 0; stream < XLINK_MAX_STREAMS; stream++) {
if (link->availableStreams[stream].id == id) {
XLink_sem_wait(&link->availableStreams[stream].sem);
int rc = 0;
while(((rc = XLink_sem_wait(&link->availableStreams[stream].sem)) == -1) && errno == EINTR)
continue;
if (rc) {
mvLog(MVLOG_ERROR,"can't wait semaphore\n");
return NULL;
}
return &link->availableStreams[stream];
}
}
@ -70,8 +76,13 @@ streamDesc_t* getStreamByName(xLinkDesc_t* link, const char* name)
for (stream = 0; stream < XLINK_MAX_STREAMS; stream++) {
if (link->availableStreams[stream].id != INVALID_STREAM_ID &&
strcmp(link->availableStreams[stream].name, name) == 0) {
XLink_sem_wait(&link->availableStreams[stream].sem);
int rc = 0;
while(((rc = XLink_sem_wait(&link->availableStreams[stream].sem)) == -1) && errno == EINTR)
continue;
if (rc) {
mvLog(MVLOG_ERROR,"can't wait semaphore\n");
return NULL;
}
return &link->availableStreams[stream];
}
}

View File

@ -93,7 +93,9 @@ int XLink_sem_wait(XLink_sem_t* sem)
XLINK_RET_ERR_IF(sem == NULL, -1);
XLINK_RET_IF_FAIL(XLink_sem_inc(sem));
int ret = sem_wait(&sem->psem);
int ret;
while(((ret = sem_wait(&sem->psem) == -1) && errno == EINTR))
continue;
XLINK_RET_IF_FAIL(XLink_sem_dec(sem));
return ret;
@ -113,6 +115,17 @@ int XLink_sem_timedwait(XLink_sem_t* sem, const struct timespec* abstime)
return ret;
}
int XLink_sem_trywait(XLink_sem_t* sem)
{
XLINK_RET_ERR_IF(sem == NULL, -1);
XLINK_RET_IF_FAIL(XLink_sem_inc(sem));
int ret = sem_trywait(&sem->psem);
XLINK_RET_IF_FAIL(XLink_sem_dec(sem));
return ret;
}
int XLink_sem_set_refs(XLink_sem_t* sem, int refs)
{
XLINK_RET_ERR_IF(sem == NULL, -1);

View File

@ -1413,7 +1413,7 @@ static void* debugConsoleThreadReader(void* ctx) {
fprintfsock(connfd, "=========================================\n");
while(1){
// use 0 as the timeout to prevent trigger false reset
xerr = XLinkReadDataWithTimeOut(streamId, &packet, 0);
xerr = XLinkReadDataWithTimeout(streamId, &packet, 0);
if(X_LINK_SUCCESS != xerr || packet == NULL)
break;
fprintfsock(connfd, NULL, packet->data, packet->length);

View File

@ -174,7 +174,7 @@ static ncStatus_t patchSetWdSwitchCommand(char **firmware, size_t *length, const
// 0x98 the write command for 8bit
// {0x00, 0x0c, 0x20, 0x70} == 0x70200c00 the address of memory type for ddrInit application
const char g_setMemTypeCommandMX[] = {0x98, 0x00, 0x0c, 0x20, 0x70};
const char g_callCommand[] = {0xba, 0x78, 0xe9, 0x00, 0x70};
const char g_callCommand[] = {0xba, 0xd0, 0xe9, 0x00, 0x70};
static ncStatus_t patchSetMemTypeCommand(char **firmware, size_t *length, const char memType) {
CHECK_HANDLE_CORRECT(firmware);