- use separate task to handle flac data coming from http_get_task()

Signed-off-by: Karl Osterseher <karli_o@gmx.at>
This commit is contained in:
Karl Osterseher
2022-12-28 17:07:09 +01:00
Unverified
parent b4d4215630
commit e79b17cbb7
2 changed files with 166 additions and 34 deletions

View File

@@ -81,10 +81,11 @@ const char *VERSION_STRING = "0.0.2";
#define OTA_TASK_PRIORITY 6
#define OTA_TASK_CORE_ID tskNO_AFFINITY // 1 // tskNO_AFFINITY
#define FLAC_DECODER_TASK_PRIORITY HTTP_TASK_PRIORITY
#define FLAC_DECODER_TASK_CORE_ID HTTP_TASK_CORE_ID // 1 // tskNO_AFFINITY
#define FLAC_DECODER_TASK_PRIORITY 7 // HTTP_TASK_PRIORITY
#define FLAC_DECODER_TASK_CORE_ID \
tskNO_AFFINITY // HTTP_TASK_CORE_ID // 1 // tskNO_AFFINITY
#define FLAC_TASK_PRIORITY 7
#define FLAC_TASK_PRIORITY 8
#define FLAC_TASK_CORE_ID tskNO_AFFINITY // 1 // tskNO_AFFINITY
xTaskHandle t_ota_task = NULL;
@@ -128,6 +129,8 @@ dspFlows_t dspFlow = dspfBiamp;
#endif
typedef struct flacData_s {
uint32_t type; // should be SNAPCAST_MESSAGE_CODEC_HEADER
// or SNAPCAST_MESSAGE_WIRE_CHUNK
char *inData;
tv_t timestamp;
pcm_chunk_message_t *outData;
@@ -226,6 +229,29 @@ void time_sync_msg_cb(void *args) {
// }
}
/**
*
*/
void free_flac_data(flacData_t *pFlacData) {
if (pFlacData->inData) {
free(pFlacData->inData);
pFlacData->inData = NULL;
}
if (pFlacData->outData) {
free(pFlacData->outData);
pFlacData->outData = NULL;
}
if (pFlacData) {
free(pFlacData);
pFlacData = NULL;
}
}
/**
*
*/
static FLAC__StreamDecoderReadStatus read_callback(
const FLAC__StreamDecoder *decoder, FLAC__byte buffer[], size_t *bytes,
void *client_data) {
@@ -236,14 +262,17 @@ static FLAC__StreamDecoderReadStatus read_callback(
xQueueReceive(decoderReadQHdl, &flacData, portMAX_DELAY);
// ESP_LOGI(TAG, "in flac read cb %d %p", flacData->bytes,
// flacData->inData);
// ESP_LOGI(TAG, "in flac read cb %d %p", flacData->bytes, flacData->inData);
if (flacData->bytes <= 0) {
free_flac_data(flacData);
return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM;
}
if (flacData->inData == NULL) {
free_flac_data(flacData);
return FLAC__STREAM_DECODER_READ_STATUS_ABORT;
}
@@ -253,14 +282,18 @@ static FLAC__StreamDecoderReadStatus read_callback(
// ESP_LOGW(TAG, "read all flac inData %d", *bytes);
} else {
memcpy(buffer, flacData->inData, *bytes);
ESP_LOGW(TAG, "dind't read all flac inData %d", *bytes);
// ESP_LOGW(TAG, "dind't read all flac inData %d", *bytes);
flacData->inData += *bytes;
flacData->bytes -= *bytes;
}
free_flac_data(flacData);
// xQueueSend (flacReadQHdl, &flacData, portMAX_DELAY);
xSemaphoreGive(decoderReadSemaphore);
// xSemaphoreGive(decoderReadSemaphore);
// ESP_LOGE(TAG, "%s: data processed", __func__);
return FLAC__STREAM_DECODER_READ_STATUS_CONTINUE;
}
@@ -271,7 +304,7 @@ static FLAC__StreamDecoderWriteStatus write_callback(
const FLAC__StreamDecoder *decoder, const FLAC__Frame *frame,
const FLAC__int32 *const buffer[], void *client_data) {
size_t i;
flacData_t *flacData = &flacOutData;
flacData_t *flacData = NULL; // = &flacOutData;
snapcastSetting_t *scSet = (snapcastSetting_t *)client_data;
int ret = 0;
uint32_t tmpData;
@@ -279,7 +312,7 @@ static FLAC__StreamDecoderWriteStatus write_callback(
(void)decoder;
xSemaphoreTake(decoderWriteSemaphore, portMAX_DELAY);
// xSemaphoreTake(decoderWriteSemaphore, portMAX_DELAY);
// xQueueReceive (flacReadQHdl, &flacData, portMAX_DELAY);
@@ -309,6 +342,13 @@ static FLAC__StreamDecoderWriteStatus write_callback(
return FLAC__STREAM_DECODER_WRITE_STATUS_ABORT;
}
flacData = (flacData_t *)malloc(sizeof(flacData_t));
if (flacData == NULL) {
return FLAC__STREAM_DECODER_WRITE_STATUS_ABORT;
}
memset(flacData, 0, sizeof(flacData_t));
flacData->bytes = frame->header.blocksize * frame->header.channels *
(frame->header.bits_per_sample / 8);
@@ -359,6 +399,8 @@ static FLAC__StreamDecoderWriteStatus write_callback(
xQueueSend(decoderWriteQHdl, &flacData, portMAX_DELAY);
// ESP_LOGE(TAG, "%s: data processed", __func__);
// xSemaphoreGive(flacWriteSemaphore);
return FLAC__STREAM_DECODER_WRITE_STATUS_CONTINUE;
@@ -367,7 +409,7 @@ static FLAC__StreamDecoderWriteStatus write_callback(
void metadata_callback(const FLAC__StreamDecoder *decoder,
const FLAC__StreamMetadata *metadata,
void *client_data) {
flacData_t *flacData = &flacOutData;
flacData_t *flacData; // = &flacOutData;
snapcastSetting_t *scSet = (snapcastSetting_t *)client_data;
(void)decoder;
@@ -377,12 +419,26 @@ void metadata_callback(const FLAC__StreamDecoder *decoder,
if (metadata->type == FLAC__METADATA_TYPE_STREAMINFO) {
// ESP_LOGI(TAG, "in flac meta cb");
flacData = (flacData_t *)malloc(sizeof(flacData_t));
if (flacData == NULL) {
ESP_LOGE(TAG, "in flac meta cb, malloc failed");
return;
}
memset(flacData, 0, sizeof(flacData_t));
// save for later
scSet->sr = metadata->data.stream_info.sample_rate;
scSet->ch = metadata->data.stream_info.channels;
scSet->bits = metadata->data.stream_info.bits_per_sample;
ESP_LOGI(TAG, "fLaC sampleformat: %d:%d:%d", scSet->sr, scSet->bits,
scSet->ch);
xQueueSend(decoderWriteQHdl, &flacData, portMAX_DELAY);
// ESP_LOGE(TAG, "%s: data processed", __func__);
}
// xSemaphoreGive(flacReadSemaphore);
@@ -482,26 +538,34 @@ void flac_task(void *pvParameters) {
// (uint64_t)currentTimestamp.sec * 1000000 +
// (uint64_t)currentTimestamp.usec);
xSemaphoreTake(decoderReadSemaphore, portMAX_DELAY);
// xSemaphoreTake(decoderReadSemaphore, portMAX_DELAY);
// send data to flac decoder
// ESP_LOGE(TAG, "%s: decoderReadQHdl start", __func__);
xQueueSend(decoderReadQHdl, &pFlacData, portMAX_DELAY);
// ESP_LOGE(TAG, "%s: decoderReadQHdl done", __func__);
// and wait until data was
// processed
xSemaphoreTake(decoderReadSemaphore, portMAX_DELAY);
// xSemaphoreTake(decoderReadSemaphore, portMAX_DELAY);
// need to release mutex
// afterwards for next round
xSemaphoreGive(decoderReadSemaphore);
// xSemaphoreGive(decoderReadSemaphore);
free(pFlacData->inData);
free(pFlacData);
// free(pFlacData->inData);
// free(pFlacData);
} else {
pcm_chunk_message_t *pcmData = NULL;
xSemaphoreGive(decoderWriteSemaphore);
// xSemaphoreGive(decoderWriteSemaphore);
// and wait until it is done
// ESP_LOGE(TAG, "%s: decoderWriteQHdl start", __func__);
xQueueReceive(decoderWriteQHdl, &pFlacData, portMAX_DELAY);
// ESP_LOGE(TAG, "%s: decoderWriteQHdl done", __func__);
if (pFlacData->outData != NULL) {
pcmData = pFlacData->outData;
pcmData->timestamp = currentTimestamp;
@@ -522,12 +586,39 @@ void flac_task(void *pvParameters) {
}
#if CONFIG_USE_DSP_PROCESSOR
dsp_setup_flow(500, scSet->sr, scSet->chkInFrames);
if (flow_drain_counter > 0) {
flow_drain_counter--;
double dynamic_vol =
((double)scSet.volume / 100 / (20 - flow_drain_counter));
if (flow_drain_counter == 0) {
#if SNAPCAST_USE_SOFT_VOL
dynamic_vol = 0;
#else
dynamic_vol = 1;
#endif
audio_hal_set_mute(board_handle->audio_hal,
server_settings_message.muted);
}
dsp_set_vol(dynamic_vol);
}
dsp_setup_flow(500, scSet.sr, scSet.chkInFrames);
dsp_processor(pcmData->fragment->payload, pcmData->fragment->size,
dspFlow);
#endif
insert_pcm_chunk(pcmData);
if (pFlacData->inData) {
free(pFlacData->inData);
pFlacData->inData = NULL;
}
if (pFlacData) {
free(pFlacData);
pFlacData = NULL;
}
} else {
free_flac_data(pFlacData);
}
}
}
@@ -556,7 +647,8 @@ static void http_get_task(void *pvParameters) {
OpusDecoder *opusDecoder = NULL;
codec_type_t codec = NONE;
snapcastSetting_t scSet;
flacData_t flacData = {NULL, {0, 0}, NULL, 0};
// flacData_t flacData = {SNAPCAST_MESSAGE_CODEC_HEADER, NULL, {0, 0}, NULL,
// 0};
flacData_t *pFlacData;
pcm_chunk_message_t *pcmData = NULL;
ip_addr_t remote_ip;
@@ -808,7 +900,7 @@ static void http_get_task(void *pvParameters) {
firstNetBuf = NULL;
#define TEST_DECODER_TASK 0
#define TEST_DECODER_TASK 1
decoderWriteSemaphore = xSemaphoreCreateMutex();
xSemaphoreTake(decoderWriteSemaphore, portMAX_DELAY);
@@ -1206,13 +1298,13 @@ static void http_get_task(void *pvParameters) {
pFlacData =
(flacData_t *)malloc(sizeof(flacData_t));
pFlacData->bytes = tmp;
pFlacData->timestamp =
wire_chnk.timestamp; // store timestamp for
// later use
pFlacData->inData =
(char *)malloc(tmp * sizeof(char));
// store timestamp for
// later use
pFlacData->timestamp = wire_chnk.timestamp;
pFlacData->inData = (char *)malloc(tmp);
memcpy(pFlacData->inData, start, tmp);
pFlacData->outData = NULL;
pFlacData->type = SNAPCAST_MESSAGE_WIRE_CHUNK;
// send data to seperate task which will handle this
xQueueSend(flacTaskQHdl, &pFlacData, portMAX_DELAY);
@@ -1400,8 +1492,12 @@ static void http_get_task(void *pvParameters) {
pFlacData = NULL; // send NULL so we know to wait
// for decoded data in task
// ESP_LOGE(TAG, "%s: flacTaskQHdl start
// wireChnk", __func__);
xQueueSend(flacTaskQHdl, &pFlacData,
portMAX_DELAY);
// ESP_LOGE(TAG, "%s: flacTaskQHdl stop wireChnk",
// __func__);
#else
xSemaphoreGive(decoderWriteSemaphore);
// and wait until it is done
@@ -1812,6 +1908,48 @@ static void http_get_task(void *pvParameters) {
FLAC_DECODER_TASK_CORE_ID);
}
#if TEST_DECODER_TASK
if (t_flac_task == NULL) {
xTaskCreatePinnedToCore(
&flac_task, "flac_task", 9 * 256, &scSet,
FLAC_TASK_PRIORITY, &t_flac_task,
FLAC_TASK_CORE_ID);
}
pFlacData = (flacData_t *)malloc(sizeof(flacData_t));
memset(pFlacData, 0, sizeof(flacData_t));
pFlacData->bytes = typedMsgLen;
pFlacData->inData = (char *)malloc(typedMsgLen);
memcpy(pFlacData->inData, tmp, typedMsgLen);
pFlacData->outData = NULL;
pFlacData->type = SNAPCAST_MESSAGE_CODEC_HEADER;
// TODO: find a smarter way for
// this wait for task creation done
// maybe use task notification
while (flacTaskQHdl == NULL) {
vTaskDelay(10);
}
// ESP_LOGE(TAG, "%s: flacTaskQHdl start codec
// header", __func__);
// send codec header to flac decoder
xQueueSend(flacTaskQHdl, &pFlacData, portMAX_DELAY);
// ESP_LOGE(TAG, "sent codec header");
// send NULL so we know to wait
// for decoded data in task
pFlacData = NULL;
xQueueSend(flacTaskQHdl, &pFlacData, portMAX_DELAY);
// ESP_LOGE(TAG, "%s: flacTaskQHdl done codec header",
// __func__);
#else
if (flacData.outData != NULL) {
free(flacData.outData);
flacData.outData = NULL;
@@ -1844,13 +1982,6 @@ static void http_get_task(void *pvParameters) {
ESP_LOGI(TAG, "fLaC sampleformat: %d:%d:%d", scSet.sr,
scSet.bits, scSet.ch);
#if TEST_DECODER_TASK
if (t_flac_task == NULL) {
xTaskCreatePinnedToCore(
&flac_task, "flac_task", 9 * 256, &scSet,
FLAC_TASK_PRIORITY, &t_flac_task,
FLAC_TASK_CORE_ID);
}
#endif
} else if (codec == PCM) {
memcpy(&channels, tmp + 22, sizeof(channels));
@@ -2098,7 +2229,7 @@ static void http_get_task(void *pvParameters) {
}
internalState++;
// no break
// fall through
}
case 5: {
@@ -2336,7 +2467,8 @@ static void http_get_task(void *pvParameters) {
player_latency_insert(tmpDiffToServer);
ESP_LOGI(TAG, "Current latency:%lld:", tmpDiffToServer);
// ESP_LOGI(TAG, "Current latency:%lld:",
// tmpDiffToServer);
// store current time
lastTimeSync = now;