[ICV][XLink] - port XLink changes from mdk (#10212)

-76384
Port changes from MDK
This commit is contained in:
Daria Mityagina 2022-03-31 09:50:26 +03:00 committed by GitHub
parent 24a74672f6
commit 5c917cfaaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 185 additions and 74 deletions

View File

@ -6,14 +6,14 @@ include_guard(GLOBAL)
set(VPU_SUPPORTED_FIRMWARES usb-ma2x8x pcie-ma2x8x)
set(VPU_SUPPORTED_FIRMWARES_HASH
"e65fcc1c6b0f3e9d814e53022c212ec0a2b83197a9df38badb298fb85ccf3acf"
"b11368fec2036d96fb703d2a40b171184fefe89f27e74a988ef1ca34260a2bc5")
"877c4e1616d14a94dd2764f4f32f1c1aa2180dcd64ad1823b31efdc3f56ad593"
"aabff3d817431792ef9e17056448979c2cdbb484ad4b0af9e68cb874ee10eef5")
#
# Default packages
#
set(FIRMWARE_PACKAGE_VERSION 1875)
set(FIRMWARE_PACKAGE_VERSION 20220307_34)
set(VPU_CLC_MA2X8X_VERSION "movi-cltools-20.09.2")
#

View File

@ -205,7 +205,6 @@ XLinkError_t XLinkReadDataWithTimeout(streamId_t streamId, streamPacketDesc_t**
/**
* @brief Sends a package to initiate the writing of data to a remote stream with timeout in ms
* XLinkWriteDataWithTimeout is not fully supported yet. The XLinkWriteData method is called instead.
* @warning Actual size of the written data is ALIGN_UP(size, 64)
* @param[in] streamId stream link Id obtained from XLinkOpenStream call
* @param[in] buffer data buffer to be transmitted

View File

@ -36,13 +36,13 @@ int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle, unsigned int
char* TypeToStr(int type);
int DispatcherUnblockEvent(eventId_t id,
xLinkEventType_t type,
streamId_t stream,
void *xlinkFD);
int DispatcherServeEvent(eventId_t id,
xLinkEventType_t type,
streamId_t stream,
void *xlinkFD);
xLinkEventType_t type,
streamId_t stream,
void *xlinkFD);
int DispatcherServeOrDropEvent(eventId_t id,
xLinkEventType_t type,
streamId_t stream,
void *xlinkFD);
#ifdef __cplusplus
}
#endif

View File

@ -84,6 +84,7 @@ typedef enum
XLINK_CLOSE_STREAM_REQ,
XLINK_PING_REQ,
XLINK_RESET_REQ,
XLINK_DROP_REQ,
XLINK_REQUEST_LAST,
//note that is important to separate request and response
XLINK_WRITE_RESP,
@ -94,6 +95,7 @@ typedef enum
XLINK_CLOSE_STREAM_RESP,
XLINK_PING_RESP,
XLINK_RESET_RESP,
XLINK_DROP_RESP,
XLINK_RESP_LAST,
/*X_LINK_IPC related events*/
@ -130,6 +132,8 @@ typedef struct xLinkEventHeader_t{
char streamName[MAX_STREAM_NAME_LENGTH];
streamId_t streamId;
uint32_t size;
uint32_t dropped;
uint32_t canBeServed;
union{
uint32_t raw;
struct{
@ -155,6 +159,8 @@ typedef struct xLinkEvent_t {
(event).header.streamId = (in_streamId); \
(event).header.type = (in_type); \
(event).header.size = (in_size); \
(event).header.dropped = 0; \
(event).header.canBeServed = 1; \
(event).data = (in_data); \
(event).deviceHandle = (in_deviceHandle); \
} while(0)

View File

@ -182,8 +182,7 @@ XLinkError_t XLinkWriteDataWithTimeout(streamId_t streamId, const uint8_t* buffe
XLINK_INIT_EVENT(event, streamId, XLINK_WRITE_REQ,
size,(void*)buffer, link->deviceHandle);
mvLog(MVLOG_WARN,"XLinkWriteDataWithTimeout is not fully supported yet. The XLinkWriteData method is called instead. Desired timeout = %d\n", timeoutMs);
XLINK_RET_IF_FAIL(addEventWithPerf(&event, &opTime, XLINK_NO_RW_TIMEOUT));
XLINK_RET_IF_FAIL(addEventWithPerf(&event, &opTime, timeoutMs));
if( glHandler->profEnable) {
glHandler->profilingData.totalWriteBytes += size;
@ -322,10 +321,10 @@ XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs)
TypeToStr(event->header.type), event->header.id, event->header.streamName);
return X_LINK_ERROR;
}
event->header.canBeServed = 1;
if (timeoutMs != XLINK_NO_RW_TIMEOUT) {
ASSERT_XLINK(event->header.type == XLINK_READ_REQ);
xLinkDesc_t* link;
xLinkDesc_t* link = NULL;
XLINK_RET_IF(getLinkByStreamId(event->header.streamId, &link));
if (DispatcherWaitEventComplete(&event->deviceHandle, timeoutMs)) // timeout reached
@ -333,17 +332,28 @@ XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs)
streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD,
event->header.streamId);
ASSERT_XLINK(stream);
event->header.dropped = 1;
if (event->header.type == XLINK_READ_REQ)
{
// XLINK_READ_REQ is a local event. It is safe to serve it.
// Limitations.
// Possible vulnerability in this mechanism:
// If we reach timeout with DispatcherWaitEventComplete and before
// we call DispatcherServeEvent, the event actually comes,
// and gets served by XLink stack and event semaphore is posted.
DispatcherServeEvent(event->header.id, XLINK_READ_REQ, stream->id, event->deviceHandle.xLinkFD);
event->header.canBeServed = 0;
XLINK_RET_IF(DispatcherServeOrDropEvent(event->header.id, XLINK_READ_REQ, stream->id, event->deviceHandle.xLinkFD));
}
else if (event->header.type == XLINK_WRITE_REQ)
{
event->header.canBeServed = 0;
XLINK_RET_IF(DispatcherServeOrDropEvent(event->header.id, XLINK_WRITE_REQ, stream->id, event->deviceHandle.xLinkFD));
}
releaseStream(stream);
if (event->header.type == XLINK_WRITE_REQ && event->header.dropped)
{
mvLog(MVLOG_ERROR,"event is dropped\n");
xLinkEvent_t dropEvent = {0};
dropEvent.header.streamId = EXTRACT_STREAM_ID(event->header.streamId);
XLINK_INIT_EVENT(dropEvent, event->header.streamId, XLINK_DROP_REQ,
0, NULL, link->deviceHandle);
DispatcherAddEvent(EVENT_LOCAL, &dropEvent);
XLINK_RET_ERR_IF(DispatcherWaitEventComplete(&link->deviceHandle, XLINK_NO_RW_TIMEOUT), dropEvent.header.streamId);
}
return X_LINK_TIMEOUT;
}

View File

@ -50,6 +50,7 @@ typedef enum {
EVENT_BLOCKED,
EVENT_READY,
EVENT_SERVED,
EVENT_DROPPED,
} xLinkEventState_t;
typedef struct xLinkEventPriv_t {
@ -446,6 +447,7 @@ char* TypeToStr(int type)
case XLINK_CLOSE_STREAM_REQ: return "XLINK_CLOSE_STREAM_REQ";
case XLINK_PING_REQ: return "XLINK_PING_REQ";
case XLINK_RESET_REQ: return "XLINK_RESET_REQ";
case XLINK_DROP_REQ: return "XLINK_DROP_REQ";
case XLINK_REQUEST_LAST: return "XLINK_REQUEST_LAST";
case XLINK_WRITE_RESP: return "XLINK_WRITE_RESP";
case XLINK_READ_RESP: return "XLINK_READ_RESP";
@ -455,6 +457,7 @@ char* TypeToStr(int type)
case XLINK_CLOSE_STREAM_RESP: return "XLINK_CLOSE_STREAM_RESP";
case XLINK_PING_RESP: return "XLINK_PING_RESP";
case XLINK_RESET_RESP: return "XLINK_RESET_RESP";
case XLINK_DROP_RESP: return "XLINK_DROP_RESP";
case XLINK_RESP_LAST: return "XLINK_RESP_LAST";
default:
break;
@ -462,6 +465,59 @@ char* TypeToStr(int type)
return "";
}
int DispatcherServeOrDropEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
{
xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
ASSERT_XLINK(curr != NULL);
xLinkEventPriv_t* event;
int blocked = 0;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, 1);
for (event = curr->lQueue.q;
event < curr->lQueue.q + MAX_EVENTS;
event++)
{
if (((event->packet.header.id == id || id == -1)
&& event->packet.header.type == type
&& event->packet.header.streamId == stream))
{
blocked = event->isServed == EVENT_BLOCKED;
if (event->packet.header.type == XLINK_READ_REQ) {
mvLog(MVLOG_DEBUG,"served read request**************** %d %s\n",
(int)event->packet.header.id,
TypeToStr((int)event->packet.header.type));
event->isServed = EVENT_SERVED;
event->packet.header.dropped = 0;
event->packet.header.canBeServed = 1;
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 0;
} else if (event->packet.header.type == XLINK_WRITE_REQ) {
if (blocked) {
mvLog(MVLOG_DEBUG,"served write request**************** %d %s\n",
(int)event->packet.header.id,
TypeToStr((int)event->packet.header.type));
event->isServed = EVENT_SERVED;
event->packet.header.dropped = 0;
event->packet.header.canBeServed = 1;
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 0;
} else {
mvLog(MVLOG_DEBUG,"droped write request**************** %d %s\n",
(int)event->packet.header.id,
TypeToStr((int)event->packet.header.type));
event->isServed = EVENT_DROPPED;
event->packet.header.dropped = 1;
event->packet.header.canBeServed = 1;
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 0;
}
}
}
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 0;
}
int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
{
xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
@ -499,33 +555,6 @@ int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t strea
return 0;
}
int DispatcherServeEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
{
xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
ASSERT_XLINK(curr != NULL);
xLinkEventPriv_t* event;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, 1);
for (event = curr->lQueue.q;
event < curr->lQueue.q + MAX_EVENTS;
event++)
{
if (((event->packet.header.id == id || id == -1)
&& event->packet.header.type == type
&& event->packet.header.streamId == stream))
{
mvLog(MVLOG_DEBUG,"served**************** %d %s\n",
(int)event->packet.header.id,
TypeToStr((int)event->packet.header.type));
event->isServed = EVENT_SERVED;
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 1;
}
}
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, 1);
return 0;
}
// ------------------------------------
// XLinkDispatcher.h implementation. End.
// ------------------------------------
@ -645,10 +674,19 @@ static void* eventReader(void* ctx)
DispatcherAddEvent(EVENT_REMOTE, &event);
if (event.header.type == XLINK_RESET_REQ) {
curr->resetXLink = 1;
mvLog(MVLOG_DEBUG,"Read XLINK_RESET_REQ, stopping eventReader thread.");
#ifdef __PC__
// Stop receiving events when receive confirmation that the device acknowledged the reset request
if (event.header.type == XLINK_RESET_RESP) {
mvLog(MVLOG_DEBUG,"Read XLINK_RESET_RESP, stopping eventReader thread.");
break;
}
#else
// Stop receiving events from remote when receive a XLINK_RESET_REQ
if (event.header.type == XLINK_RESET_REQ) {
mvLog(MVLOG_DEBUG,"Read XLINK_RESET_REQ, stopping eventReader thread.");
break;
}
#endif
}
return 0;
@ -751,13 +789,13 @@ static int isEventTypeRequest(xLinkEventPriv_t* event)
static void postAndMarkEventServed(xLinkEventPriv_t *event)
{
if (event->retEv){
if (event->retEv) {
// the xLinkEventPriv_t slot pointed by "event" will be
// re-cycled as soon as we mark it as EVENT_SERVED,
// so before that, we copy the result event into XLink API layer
*(event->retEv) = event->packet;
}
if(event->sem){
if (event->sem && event->isServed != EVENT_DROPPED) {
if (XLink_sem_post(event->sem)) {
mvLog(MVLOG_ERROR,"can't post semaphore\n");
}
@ -811,7 +849,7 @@ static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_
xLinkEventHeader_t *header = &event->packet.header;
if (header->flags.bitField.block){ //block is requested
event->isServed = EVENT_BLOCKED;
} else if(header->flags.bitField.localServe == 1 ||
} else if (header->flags.bitField.localServe == 1 ||
(header->flags.bitField.ack == 0
&& header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
postAndMarkEventServed(event);
@ -820,7 +858,7 @@ static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_
event->isServed = EVENT_PENDING;
mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
TypeToStr(event->packet.header.type));
}else{
} else {
return 1;
}
return 0;
@ -836,16 +874,31 @@ static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState
xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
xLinkEventHeader_t *evHeader = &event->packet.header;
if (curr->lQueue.q[i].isServed == EVENT_PENDING &&
header->id == evHeader->id &&
header->type == evHeader->type - XLINK_REQUEST_LAST -1)
if (header->id == evHeader->id &&
header->type == evHeader->type - XLINK_REQUEST_LAST - 1)
{
mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
TypeToStr(header->type));
//propagate back flags
header->flags = evHeader->flags;
postAndMarkEventServed(&curr->lQueue.q[i]);
break;
if (curr->lQueue.q[i].packet.header.type == XLINK_WRITE_REQ ||
curr->lQueue.q[i].packet.header.type == XLINK_READ_REQ) {
while (!curr->lQueue.q[i].packet.header.canBeServed) continue;
}
if (curr->lQueue.q[i].isServed == EVENT_PENDING) {
mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
TypeToStr(header->type));
//propagate back flags
header->flags = evHeader->flags;
postAndMarkEventServed(&curr->lQueue.q[i]);
break;
} else if (curr->lQueue.q[i].isServed == EVENT_DROPPED) {
mvLog(MVLOG_DEBUG,"----------------------ISdropped %s\n",
TypeToStr(header->type));
streamDesc_t* stream = getStreamById(event->packet.deviceHandle.xLinkFD,
event->packet.header.streamId);
if (stream->remoteFillLevel)
stream->remoteFillLevel -= event->packet.header.size;
releaseStream(stream);
postAndMarkEventServed(&curr->lQueue.q[i]);
break;
}
}
}
if (i == MAX_EVENTS) {
@ -1149,6 +1202,13 @@ static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
mvLog(MVLOG_ERROR, "Event sending failed");
}
#ifndef __PC__
// Stop scheduler thread after XLINK_RESET_RESP was successfully sent to host
if (toSend->header.type == XLINK_RESET_RESP) {
curr->resetXLink = 1;
mvLog(MVLOG_DEBUG, "Stop scheduler thread.");
}
#endif
} else {
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
}

View File

@ -169,7 +169,8 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
ASSERT_XLINK(stream);
XLINK_EVENT_ACKNOWLEDGE(event);
uint32_t releasedSize = 0;
releasePacketFromStream(stream, &releasedSize);
if (stream->blockedPackets)
releasePacketFromStream(stream, &releasedSize);
event->header.size = releasedSize;
releaseStream(stream);
break;
@ -231,6 +232,11 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
mvLog(MVLOG_DEBUG,"XLINK_PING_REQ - do nothing\n");
break;
}
case XLINK_DROP_REQ:
{
XLINK_EVENT_ACKNOWLEDGE(event);
break;
}
case XLINK_WRITE_RESP:
case XLINK_READ_RESP:
case XLINK_READ_REL_RESP:
@ -243,6 +249,8 @@ int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
//should not happen
event->header.flags.bitField.localServe = 1;
break;
case XLINK_DROP_RESP:
break;
default:
{
mvLog(MVLOG_ERROR,
@ -271,16 +279,22 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
response->header.size = event->header.size;
response->header.streamId = event->header.streamId;
response->deviceHandle = event->deviceHandle;
stream = getStreamById(event->deviceHandle.xLinkFD,
event->header.streamId);
ASSERT_XLINK(stream);
XLINK_EVENT_ACKNOWLEDGE(response);
// we got some data. We should unblock a blocked read
int xxx = DispatcherUnblockEvent(-1,
XLINK_READ_REQ,
response->header.streamId,
event->deviceHandle.xLinkFD);
(void) xxx;
mvLog(MVLOG_DEBUG,"unblocked from stream %d %d\n",
(int)response->header.streamId, (int)xxx);
if (!event->header.dropped)
{
// we got some data. We should unblock a blocked read
int xxx = DispatcherUnblockEvent(-1,
XLINK_READ_REQ,
response->header.streamId,
event->deviceHandle.xLinkFD);
mvLog(MVLOG_DEBUG,"unblocked from stream %d %d\n",
(int)response->header.streamId, (int)xxx);
}
releaseStream(stream);
}
break;
case XLINK_READ_REQ:
@ -288,6 +302,8 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
case XLINK_READ_REL_SPEC_REQ:
XLINK_EVENT_ACKNOWLEDGE(response);
response->header.type = XLINK_READ_REL_SPEC_RESP;
response->header.size = event->header.size;
response->header.streamId = event->header.streamId;
response->deviceHandle = event->deviceHandle;
stream = getStreamById(event->deviceHandle.xLinkFD,
event->header.streamId);
@ -314,6 +330,8 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
case XLINK_READ_REL_REQ:
XLINK_EVENT_ACKNOWLEDGE(response);
response->header.type = XLINK_READ_REL_RESP;
response->header.size = event->header.size;
response->header.streamId = event->header.streamId;
response->deviceHandle = event->deviceHandle;
stream = getStreamById(event->deviceHandle.xLinkFD,
event->header.streamId);
@ -422,6 +440,22 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
response->deviceHandle = event->deviceHandle;
// need to send the response, serve the event and then reset
break;
case XLINK_DROP_REQ:
{
response->header.type = XLINK_DROP_RESP;
response->header.size = event->header.size;
response->header.streamId = event->header.streamId;
response->deviceHandle = event->deviceHandle;
XLINK_EVENT_ACKNOWLEDGE(response);
stream = getStreamById(event->deviceHandle.xLinkFD,
event->header.streamId);
uint32_t releasedSize = 0;
if (stream->blockedPackets)
releasePacketFromStream(stream, &releasedSize);
releaseStream(stream);
break;
}
case XLINK_WRITE_RESP:
break;
case XLINK_READ_RESP:
@ -470,6 +504,8 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
break;
case XLINK_RESET_RESP:
break;
case XLINK_DROP_RESP:
break;
default:
{
mvLog(MVLOG_ERROR,