diff --git a/main/main.c b/main/main.c index c3710d3..a5e5f77 100644 --- a/main/main.c +++ b/main/main.c @@ -20,6 +20,8 @@ #include "mp3_decoder.h" #include "flac_decoder.h" +#include "auto_flac_dec.h" + #include "esp_peripherals.h" #include "periph_spiffs.h" #include "board.h" @@ -41,7 +43,7 @@ #define CONFIG_USE_SNTP 0 -#define DAC_OUT_BUFFER_TIME_US 3 * 1526LL //TODO: not sure about this... // @48kHz, 16bit samples, 2 channels and a DMA buffer length of 300 Byte and 3 buffers. 300 Byte / (48000 * 2 Byte * 2 channels) +#define DAC_OUT_BUFFER_TIME_US 0//3 * 1526LL //TODO: not sure about this... // @48kHz, 16bit samples, 2 channels and a DMA buffer length of 300 Byte and 3 buffers. 300 Byte / (48000 * 2 Byte * 2 channels) static const char *TAG = "SNAPCLIENT"; @@ -52,10 +54,10 @@ char *codecString = NULL; // configMAX_PRIORITIES - 1 // TODO: what are the best values here? -#define SYNC_TASK_PRIORITY 6 +#define SYNC_TASK_PRIORITY 12 #define SYNC_TASK_CORE_ID 1 -#define HTTP_TASK_PRIORITY 1 +#define HTTP_TASK_PRIORITY 6 #define HTTP_TASK_CORE_ID 0 #define I2S_TASK_PRIORITY 0 @@ -64,26 +66,35 @@ char *codecString = NULL; #define AGE_THRESHOLD 50LL // in µs -#define WIRE_CHNK_QUEUE_LENGTH 50 // TODO: one chunk is hardcoded to 20ms, change it to be dynamically adjustable. 1s buffer = 50 -static StaticQueue_t wireChunkQueue; -uint8_t wireChunkQueueStorageArea[ WIRE_CHNK_QUEUE_LENGTH * sizeof(wire_chunk_message_t *) ]; +QueueHandle_t timestampQueueHandle; +#define TIMESTAMP_QUEUE_LENGTH 150 // TODO: what's the minimum value needed here, although probably not that important because we create queue using xQueueCreate() +static StaticQueue_t timestampQueue; +uint8_t timestampQueueStorageArea[ TIMESTAMP_QUEUE_LENGTH * sizeof(tv_t) ]; + +QueueHandle_t pcmChunkQueueHandle; +#define PCM_CHNK_QUEUE_LENGTH 150 // TODO: one chunk is hardcoded to 20ms, change it to be dynamically adjustable. 1s buffer = 50 +static StaticQueue_t pcmChunkQueue; +uint8_t pcmChunkQueueStorageArea[ PCM_CHNK_QUEUE_LENGTH * sizeof(wire_chunk_message_t *) ]; + typedef struct snapcast_sync_task_cfg_s { - QueueHandle_t *sync_queue_handle; - audio_element_handle_t *p_raw_stream_reader; + audio_element_handle_t *p_raw_stream_writer; int64_t outputBufferDacTime_us; int64_t buffer_us; } snapcast_sync_task_cfg_t; +typedef struct http_task_cfg_s { + audio_element_handle_t *p_raw_stream_writer_to_decoder; + audio_element_handle_t *p_raw_stream_writer_to_i2s; +} http_task_cfg_t; + SemaphoreHandle_t diffBufSemaphoreHandle = NULL; static struct timeval diffToServer = {0, 0}; // median diff to server in µs static struct timeval diffBuf[50] = {0}; // collected diff's to server //static struct timeval *medianArray = NULL; // temp median calculation data is stored at this location static struct timeval medianArray[50] = {0}; // temp median calculation data is stored at this location -static int indexOldest = 0; -xQueueHandle i2s_queue; uint32_t buffer_ms = 400; uint8_t muteCH[4] = {0}; audio_board_handle_t board_handle; @@ -96,7 +107,7 @@ audio_board_handle_t board_handle; /* Constants that aren't configurable in menuconfig */ #define HOST "192.168.1.6" #define PORT 1704 -#define BUFF_LEN 4000 +#define BUFF_LEN 5000 unsigned int addr; uint32_t port = 0; /* Logging tag */ @@ -121,21 +132,22 @@ static EventGroupHandle_t s_wifi_event_group; static int s_retry_num = 0; -static void event_handler(void* arg, esp_event_base_t event_base, - int32_t event_id, void* event_data) -{ +static void event_handler(void* arg, esp_event_base_t event_base, int32_t event_id, void* event_data) { if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) { esp_wifi_connect(); - } else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) { + } + else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) { if (s_retry_num < 10) { esp_wifi_connect(); s_retry_num++; ESP_LOGI(TAG, "retry to connect to the AP"); - } else { + } + else { xEventGroupSetBits(s_wifi_event_group, WIFI_FAIL_BIT); } ESP_LOGI(TAG,"connect to the AP fail"); - } else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) { + } + else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) { ip_event_got_ip_t* event = (ip_event_got_ip_t*) event_data; ESP_LOGI(TAG, "got ip:" IPSTR, IP2STR(&event->ip_info.ip)); s_retry_num = 0; @@ -143,8 +155,7 @@ static void event_handler(void* arg, esp_event_base_t event_base, } } -void wifi_init_sta(void) -{ +void wifi_init_sta(void) { s_wifi_event_group = xEventGroupCreate(); ESP_ERROR_CHECK(esp_netif_init()); @@ -229,8 +240,8 @@ void mdns_print_results(mdns_result_t * results){ } } -void find_mdns_service(const char * service_name, const char * proto) -{ + +void find_mdns_service(const char * service_name, const char * proto) { ESP_LOGI(TAG, "Query PTR: %s.%s.local", service_name, proto); mdns_result_t * r = NULL; @@ -285,10 +296,8 @@ void QuickSort(struct timeval *a, int left, int right) { /** * */ -// TODO: find better implementation. Could be a performance issue? int8_t get_median( const struct timeval *tDiff, size_t n, struct timeval *result ) { - struct timeval median = {0 , 0}; - int i, j; + struct timeval median; if (tDiff == NULL) { ESP_LOGE(TAG, "get_median: buffer error"); @@ -333,15 +342,9 @@ int8_t get_median( const struct timeval *tDiff, size_t n, struct timeval *result /** * */ -// TODO: find a better way to do this! Especially the initialization/reset part -int8_t set_diff_to_server( struct timeval *tDiff, size_t n, uint8_t *index ) { +int8_t set_diff_to_server( struct timeval *tDiff, size_t len) { int8_t ret = -1; - struct timeval diff, now; struct timeval tmpDiffToServer; - static struct timeval lastTimeSync = { 0, 0 }; - static uint8_t bufferFull = false; - size_t len; - int i; if (diffBufSemaphoreHandle == NULL) { ESP_LOGE(TAG, "set_diff_to_server: mutex handle == NULL"); @@ -349,65 +352,13 @@ int8_t set_diff_to_server( struct timeval *tDiff, size_t n, uint8_t *index ) { return -1; } - // get current time - if (gettimeofday(&now, NULL)) { - ESP_LOGE(TAG, "set_diff_to_server: Failed to get time of day"); - - return -1; - } - - // clear diffBuffer if last update is older than a minute - diff.tv_sec = now.tv_sec - lastTimeSync.tv_sec; - diff.tv_usec = now.tv_usec - lastTimeSync.tv_usec; - if ( diff.tv_sec > 60 ) { - ESP_LOGW(TAG, "Last time sync older than a minute. Clearing time buffer"); - - for (i=0; i= (n - 1)) && (bufferFull == false)) { - bufferFull = true; - } - - // store current time for next run - lastTimeSync.tv_sec = now.tv_sec; - lastTimeSync.tv_usec = now.tv_usec; - - if (bufferFull == true) { - len = n; - } - else { - len = *index; - } - - //ESP_LOGI(TAG, "set_diff_to_server: index: %d, bufferFull %d", *index, bufferFull); - ret = get_median(tDiff, len, &tmpDiffToServer); if (ret < 0) { ESP_LOGW(TAG, "set_diff_to_server: get median failed"); } + //ESP_LOGI(TAG, "set_diff_to_server: median is %ld.%06ld", tmpDiffToServer.tv_sec, tmpDiffToServer.tv_usec); + if (xSemaphoreTake( diffBufSemaphoreHandle, 1 ) == pdFALSE) { ESP_LOGW(TAG, "set_diff_to_server: can't take semaphore"); @@ -428,6 +379,8 @@ int8_t get_diff_to_server( struct timeval *tDiff ) { static struct timeval lastDiff = { 0, 0 }; if (diffBufSemaphoreHandle == NULL) { + ESP_LOGE(TAG, "get_diff_to_server: diffBufSemaphoreHandle == NULL"); + return -1; } @@ -468,16 +421,15 @@ int8_t server_now( struct timeval *sNow ) { } if ((diff.tv_sec == 0) && (diff.tv_usec == 0)) { - ESP_LOGW(TAG, "server_now: diff to server not initialized yet"); + //ESP_LOGW(TAG, "server_now: diff to server not initialized yet"); return -1; } -// ESP_LOGI(TAG, "now: %lldus", (int64_t)now.tv_sec * 1000000LL + (int64_t)now.tv_usec); -// ESP_LOGI(TAG, "diff: %lldus", (int64_t)diff.tv_sec * 1000000LL + (int64_t)diff.tv_usec); - timeradd(&now, &diff, sNow); +// ESP_LOGI(TAG, "now: %lldus", (int64_t)now.tv_sec * 1000000LL + (int64_t)now.tv_usec); +// ESP_LOGI(TAG, "diff: %lldus", (int64_t)diff.tv_sec * 1000000LL + (int64_t)diff.tv_usec); // ESP_LOGI(TAG, "serverNow: %lldus", (int64_t)sNow->tv_sec * 1000000LL + (int64_t)sNow->tv_usec); return 0; @@ -486,7 +438,7 @@ int8_t server_now( struct timeval *sNow ) { static void snapcast_sync_task(void *pvParameters) { snapcast_sync_task_cfg_t *taskCfg = (snapcast_sync_task_cfg_t *)pvParameters; - wire_chunk_message_t *wireChnk = NULL; + wire_chunk_message_t *chnk = NULL; struct timeval serverNow = {0, 0}; int64_t age; BaseType_t ret; @@ -494,74 +446,70 @@ static void snapcast_sync_task(void *pvParameters) { ESP_LOGI(TAG, "started sync task"); -// ESP_LOGI(TAG, "tvAgeCompare1: %fms", (float)tvAgeCompare1.tv_sec * 1000.0 + (float)tvAgeCompare1.tv_usec / 1000.0); -// ESP_LOGI(TAG, "tvAgeCompare2: %fms", (float)tvAgeCompare2.tv_sec * 1000.0 + (float)tvAgeCompare2.tv_usec / 1000.0); -// ESP_LOGI(TAG, "tvAgeCompare3: %fms", (float)tvAgeCompare3.tv_sec * 1000.0 + (float)tvAgeCompare3.tv_usec / 1000.0); -// ESP_LOGI(TAG, "tvAgeCompare4: %fms", (float)tvAgeCompare4.tv_sec * 1000.0 + (float)tvAgeCompare4.tv_usec / 1000.0); - while(1) { - if (wireChnk == NULL) { - ret = xQueueReceive( *(taskCfg->sync_queue_handle), &wireChnk, portMAX_DELAY ); + if (chnk == NULL) { + ret = xQueueReceive(pcmChunkQueueHandle, &chnk, portMAX_DELAY ); } else { ret = pdPASS; } if( ret == pdPASS ) { - if (initial_sync > 5) { + if (initial_sync > 5) { // hard sync was successfull? if (server_now(&serverNow) >= 0) { age = ((int64_t)serverNow.tv_sec * 1000000LL + (int64_t)serverNow.tv_usec) - - ((int64_t)wireChnk->timestamp.sec * 1000000LL + (int64_t)wireChnk->timestamp.usec) - + ((int64_t)chnk->timestamp.sec * 1000000LL + (int64_t)chnk->timestamp.usec) - (int64_t)taskCfg->buffer_us + (int64_t)taskCfg->outputBufferDacTime_us; - //ESP_LOGI(TAG, "before age: %lldus", age); +// ESP_LOGI(TAG, "before age: %lldus", age); if (age < -(int64_t)taskCfg->buffer_us / 10LL) { // if age gets younger than 1/10 of buffer then do a hard resync initial_sync = 0; - ESP_LOGI(TAG, "need resync, skipping chnk. age: %lldus", age); + ESP_LOGW(TAG, "need resync, skipping chnk. age: %lldus", age); - wire_chunk_message_free(wireChnk); - free(wireChnk); - wireChnk = NULL; + free(chnk->payload); + free(chnk); + chnk = NULL; continue; } else if (age < -1099LL) { - vTaskDelay((TickType_t)(-age / 1000LL)); // TODO: find better way to do a exact delay to age, skipping probably goes away afterwards + //ESP_LOGI(TAG, "age: %lldus", age); + + vTaskDelay( pdMS_TO_TICKS(-age / 1000LL) ); //((TickType_t)(-age / 1000LL)); // TODO: find better way to do a exact delay to age, skipping probably goes away afterwards } else if (age > 0) { - ESP_LOGI(TAG, "skipping chunk, age: %lldus", age); + ESP_LOGW(TAG, "skipping chunk, age: %lldus", age); - wire_chunk_message_free(wireChnk); - free(wireChnk); - wireChnk = NULL; + free(chnk->payload); + free(chnk); + chnk = NULL; continue; } } -// // just for testing, print age + // just for testing, print age // if (server_now(&serverNow) >= 0) { // age = ((int64_t)serverNow.tv_sec * 1000000LL + (int64_t)serverNow.tv_usec) - -// ((int64_t)wireChnk->timestamp.sec * 1000000LL + (int64_t)wireChnk->timestamp.usec) - +// ((int64_t)chnk->timestamp.sec * 1000000LL + (int64_t)chnk->timestamp.usec) - // (int64_t)taskCfg->buffer_us + // (int64_t)taskCfg->outputBufferDacTime_us; // // ESP_LOGI(TAG, "after age: %lldus", age); // } - raw_stream_write(*(taskCfg->p_raw_stream_reader), wireChnk->payload, wireChnk->size); + raw_stream_write(*(taskCfg->p_raw_stream_writer), chnk->payload, chnk->size); - wire_chunk_message_free(wireChnk); - free(wireChnk); - wireChnk = NULL; + free(chnk->payload); + free(chnk); + chnk = NULL; } - else if (server_now(&serverNow) >= 0) - { + else if (server_now(&serverNow) >= 0) { // hard syncing // calc age in µs age = ((int64_t)serverNow.tv_sec * 1000000LL + (int64_t)serverNow.tv_usec) - - ((int64_t)wireChnk->timestamp.sec * 1000000LL + (int64_t)wireChnk->timestamp.usec) - + ((int64_t)chnk->timestamp.sec * 1000000LL + (int64_t)chnk->timestamp.usec) - (int64_t)taskCfg->buffer_us + (int64_t)taskCfg->outputBufferDacTime_us; @@ -573,39 +521,44 @@ static void snapcast_sync_task(void *pvParameters) { if (age < -(int64_t)taskCfg->buffer_us) { // fast forward - ESP_LOGI(TAG, "fast forward, age: %lldus", age); + ESP_LOGW(TAG, "fast forward, age: %lldus", age); - wire_chunk_message_free(wireChnk); - free(wireChnk); - wireChnk = NULL; + free(chnk->payload); + free(chnk); + chnk = NULL; } else if ((age >= -AGE_THRESHOLD ) && ( age <= 0 )) { initial_sync++; - wire_chunk_message_free(wireChnk); - free(wireChnk); - wireChnk = NULL; + free(chnk->payload); + free(chnk); + chnk = NULL; } else if (age > 0 ) { - ESP_LOGI(TAG, "too old, age: %lldus", age); + ESP_LOGW(TAG, "too old, age: %lldus", age); initial_sync = 0; - wire_chunk_message_free(wireChnk); - free(wireChnk); - wireChnk = NULL; + free(chnk->payload); + free(chnk); + chnk = NULL; } } + else { + vTaskDelay( pdMS_TO_TICKS(10) ); + } } } } + /** * */ static void http_get_task(void *pvParameters) { - audio_element_handle_t *p_raw_stream_reader = (audio_element_handle_t *)pvParameters; + http_task_cfg_t *httpTaskCfg = (http_task_cfg_t *)pvParameters; + audio_element_handle_t *p_raw_stream_writer; struct sockaddr_in servaddr; char *start; int sockfd; @@ -616,11 +569,14 @@ static void http_get_task(void *pvParameters) { time_message_t time_message; struct timeval tmpDiffToServer; uint8_t diffBufCnt = 0; - QueueHandle_t wireChunkQueueHandle; const int64_t outputBufferDacTime_us = DAC_OUT_BUFFER_TIME_US; // in ms TaskHandle_t syncTask = NULL; snapcast_sync_task_cfg_t snapcastTaskCfg; + struct timeval lastTimeSync = { 0, 0 }; + uint8_t bufferFull = false; + p_raw_stream_writer = httpTaskCfg->p_raw_stream_writer_to_decoder; + // create semaphore for time diff buffer to server diffBufSemaphoreHandle = xSemaphoreCreateMutex(); @@ -630,12 +586,11 @@ static void http_get_task(void *pvParameters) { id_counter = 0; // create snapcast receive buffer - wireChunkQueueHandle = xQueueCreateStatic( WIRE_CHNK_QUEUE_LENGTH, - sizeof(wire_chunk_message_t *), - wireChunkQueueStorageArea, - - &wireChunkQueue - ); + pcmChunkQueueHandle = xQueueCreateStatic( PCM_CHNK_QUEUE_LENGTH, + sizeof(wire_chunk_message_t *), + pcmChunkQueueStorageArea, + &pcmChunkQueue + ); while(1) { memset((void *)diffBuf, 0, sizeof(diffBuf)); @@ -699,26 +654,26 @@ static void http_get_task(void *pvParameters) { } bool received_header = false; - base_message_t base_message = { - SNAPCAST_MESSAGE_HELLO, - 0x0, - 0x0, - { now.tv_sec, now.tv_usec }, - { 0x0, 0x0 }, - 0x0, - }; + base_message_t base_message = { + SNAPCAST_MESSAGE_HELLO, + 0x0, + 0x0, + { now.tv_sec, now.tv_usec }, + { 0x0, 0x0 }, + 0x0, + }; - hello_message_t hello_message = { - mac_address, - "ESP32-Caster", - "0.0.2", - "libsnapcast", - "esp32", - "xtensa", - 1, - mac_address, - 2, - }; + hello_message_t hello_message = { + mac_address, + "ESP32-Caster", + "0.0.2", + "libsnapcast", + "esp32", + "xtensa", + 1, + mac_address, + 2, + }; hello_message_serialized = hello_message_serialize(&hello_message, (size_t*) &(base_message.size)); if (!hello_message_serialized) { @@ -744,7 +699,7 @@ static void http_get_task(void *pvParameters) { size = 0; result = 0; while (size < BASE_MESSAGE_SIZE) { - result = recv(sockfd, &(buff[size]), BASE_MESSAGE_SIZE - size, MSG_DONTWAIT); + result = read(sockfd, &(buff[size]), BASE_MESSAGE_SIZE - size); if (result < 0) { break; @@ -752,9 +707,10 @@ static void http_get_task(void *pvParameters) { size += result; } - // TODO: what about other errno possibilities? - if (errno == ENOTCONN) { - ESP_LOGI(TAG, "%s", strerror(errno)); + if (result < 0) { + if (errno != 0 ) { + ESP_LOGI(TAG, "%s", strerror(errno)); + } break; // stop for(;;) will try to reconnect then } @@ -782,7 +738,22 @@ static void http_get_task(void *pvParameters) { start = buff; size = 0; + + // TODO: dynamically allocate memory for the next read!!! + // generate an error for now if we try to read more than BUFF_LEN in next lines + if (base_message.size > BUFF_LEN) { + ESP_LOGE(TAG, "base_message.size too big %d", base_message.size); + + return; + } + while (size < base_message.size) { + if (size >= BUFF_LEN) { + ESP_LOGE(TAG, "Index too high"); + + return; + } + result = read(sockfd, &(buff[size]), base_message.size - size); if (result < 0) { ESP_LOGI(TAG, "Failed to read from server: %d", result); @@ -794,7 +765,11 @@ static void http_get_task(void *pvParameters) { } if (result < 0) { - break; + if (errno != 0 ) { + ESP_LOGI(TAG, "%s", strerror(errno)); + } + + break; // stop for(;;) will try to reconnect then } switch (base_message.type) { @@ -805,21 +780,15 @@ static void http_get_task(void *pvParameters) { return; } - ESP_LOGI(TAG, "Received codec header message"); - size = codec_header_message.size; start = codec_header_message.payload; + //ESP_LOGI(TAG, "Received codec header message with size %d", codec_header_message.size); + if (strcmp(codec_header_message.codec,"flac") == 0) { - // TODO: reset queue and free all memory (wirechunks + payload) if new codec header is received while stream session is ongoing - // something like the following will probably do the trick in a loop until queue is empty. get current queue size and empty it. + // TODO: maybe restart the whole thing if a new codec header is received while stream session is ongoing - //wire_chunk_message_t *wire_chunk_message - //wire_chunk_message_free(wire_chunk_message); - //free(wire_chunk_message); - //xQueueReset(wireChunkQueueHandle); // reset wire chunk queue - - raw_stream_write(*p_raw_stream_reader, codec_header_message.payload, size); + raw_stream_write(*p_raw_stream_writer, codec_header_message.payload, size); } else if (strcmp(codec_header_message.codec,"opus") == 0) { // TODO: NOT Implemented yet! @@ -867,31 +836,39 @@ static void http_get_task(void *pvParameters) { continue; } - wire_chunk_message_t *wire_chunk_message = (wire_chunk_message_t *)malloc(sizeof(wire_chunk_message_t)); - if (wire_chunk_message == NULL) { - ESP_LOGI(TAG, "Failed to allocate memory for wire chunk"); + wire_chunk_message_t wire_chunk_message; - break; - } - - result = wire_chunk_message_deserialize(wire_chunk_message, start, size); + result = wire_chunk_message_deserialize(&wire_chunk_message, start, size); if (result) { ESP_LOGI(TAG, "Failed to read wire chunk: %d\r\n", result); - wire_chunk_message_free(wire_chunk_message); - free(wire_chunk_message); + wire_chunk_message_free(&wire_chunk_message); break; } - //ESP_LOGI(TAG, "wire chnk with size: %d, timestamp %d.%d", wire_chunk_message->size, wire_chunk_message->timestamp.sec, wire_chunk_message->timestamp.usec); + //ESP_LOGI(TAG, "wire chnk with size: %d, timestamp %d.%d", wire_chunk_message.size, wire_chunk_message.timestamp.sec, wire_chunk_message.timestamp.usec); - if( xQueueSendToBack( wireChunkQueueHandle, (void *)&wire_chunk_message, ( TickType_t ) portMAX_DELAY) != pdPASS ) - { - ESP_LOGI(TAG, "Failed to post the message"); + // store chunk's timestamp, decoder callback will need it later + tv_t timestamp; + timestamp = wire_chunk_message.timestamp; - wire_chunk_message_free(wire_chunk_message); - free(wire_chunk_message); + if (xQueueSendToBack( timestampQueueHandle, ×tamp, portMAX_DELAY) == pdTRUE) { + // write encoded data to decoder pipeline, callback will trigger when it's finished + // also we do a check if all data was written successfully + int bytesWritten = 0; + + while ( bytesWritten < wire_chunk_message.size ) { + bytesWritten += raw_stream_write(*p_raw_stream_writer, wire_chunk_message.payload, wire_chunk_message.size); + if (bytesWritten < wire_chunk_message.size) { + vTaskDelay(100); + } + } } + else { + ESP_LOGW(TAG, "timestamp queue full, dropping data ..."); + } + + wire_chunk_message_free(&wire_chunk_message); break; } @@ -929,10 +906,9 @@ static void http_get_task(void *pvParameters) { audio_hal_set_volume(board_handle->audio_hal, server_settings_message.volume); if (syncTask == NULL) { - ESP_LOGI(TAG, "[ 8 ] Start snapcast_sync_task"); + ESP_LOGI(TAG, "Start snapcast_sync_task"); - snapcastTaskCfg.sync_queue_handle = &wireChunkQueueHandle; - snapcastTaskCfg.p_raw_stream_reader = p_raw_stream_reader; + snapcastTaskCfg.p_raw_stream_writer = httpTaskCfg->p_raw_stream_writer_to_i2s; snapcastTaskCfg.outputBufferDacTime_us = outputBufferDacTime_us; snapcastTaskCfg.buffer_us = (int64_t)buffer_ms * 1000LL; xTaskCreatePinnedToCore(&snapcast_sync_task, "snapcast_sync_task", 4*4096, &snapcastTaskCfg, SYNC_TASK_PRIORITY, &syncTask, SYNC_TASK_CORE_ID); @@ -980,20 +956,50 @@ static void http_get_task(void *pvParameters) { tmpDiffToServer.tv_usec /= 2; } - //ESP_LOGI(TAG, "Current latency: %ld.%06ld", tmpDiffToServer.tv_sec, tmpDiffToServer.tv_usec); +// ESP_LOGI(TAG, "Current latency: %ld.%06ld", tmpDiffToServer.tv_sec, tmpDiffToServer.tv_usec); - diffBuf[diffBufCnt++] = tmpDiffToServer; - if (diffBufCnt >= (sizeof(diffBuf)/sizeof(struct timeval))) { - diffBufCnt = 0; - } + // following code is storing / initializing / resetting diff to server algorithm + // we collect a number of latencies. Basded on these we can get the median of server now + { + struct timeval diff; + // clear diffBuffer if last update is older than a minute + timersub(&now, &lastTimeSync, &diff); + if ( diff.tv_sec > 60 ) { + ESP_LOGW(TAG, "Last time sync older than a minute. Clearing time buffer"); - set_diff_to_server(diffBuf, sizeof(diffBuf) / sizeof(struct timeval), &diffBufCnt); + memset(diffBuf, 0, sizeof(diffBuf)); + diffBufCnt = 0; + bufferFull = false; + } + + // store current time for next run + lastTimeSync.tv_sec = now.tv_sec; + lastTimeSync.tv_usec = now.tv_usec; + + diffBuf[diffBufCnt++] = tmpDiffToServer; + if (diffBufCnt >= (sizeof(diffBuf)/sizeof(struct timeval))) { + bufferFull = true; + + diffBufCnt = 0; + } + + size_t bufLen; + if (bufferFull == true) { + bufLen = sizeof(diffBuf)/sizeof(struct timeval); + } + else { + bufLen = diffBufCnt; + } + + set_diff_to_server(diffBuf, bufLen); + } break; } } + // TODO: create a dedicated task for this which is started upon connect and deleted upon disconnect // If it's been a second or longer since our last time message was // sent, do so now result = gettimeofday(&now, NULL); @@ -1038,8 +1044,6 @@ static void http_get_task(void *pvParameters) { //ESP_LOGI(TAG, "sent time sync message"); } - - vTaskDelay( 1 / portTICK_PERIOD_MS ); } if (syncTask != NULL) { @@ -1075,8 +1079,8 @@ void sntp_sync_time(struct timeval *tv_ntp) { /** * */ -void sntp_cb(struct timeval *tv) -{ struct tm timeinfo = { 0 }; +void sntp_cb(struct timeval *tv) { + struct tm timeinfo = { 0 }; time_t now = tv->tv_sec; localtime_r(&now, &timeinfo); char strftime_buf[64]; @@ -1118,15 +1122,63 @@ void set_time_from_sntp() { */ } + +int flac_decoder_write_cb(audio_element_handle_t el, char *buffer, int len, TickType_t ticks_to_wait, void *context) { + wire_chunk_message_t *pcm_chunk_message; + tv_t timestamp; + int ret = len; + + //ESP_LOGI(TAG, "flac_decoder_write_cb: got buffer with length %d", len); + + if (xQueueReceive( timestampQueueHandle, ×tamp, 0 ) == pdPASS) { + pcm_chunk_message = (wire_chunk_message_t *)malloc(sizeof(wire_chunk_message_t)); + if (pcm_chunk_message == NULL) { + ESP_LOGE(TAG, "flac_decoder_write_cb: Failed to allocate memory for pcm chunk message"); + + return AEL_IO_FAIL; + } + + pcm_chunk_message->payload = (char *)malloc(sizeof(char) * len); + if (pcm_chunk_message->payload == NULL) { + ESP_LOGE(TAG, "flac_decoder_write_cb: Failed to allocate memory for pcm chunk payload"); + + return AEL_IO_FAIL; + } + + pcm_chunk_message->size = len; + pcm_chunk_message->timestamp = timestamp; + memcpy(pcm_chunk_message->payload, buffer, len); + + ret = len; + if( xQueueSendToBack( pcmChunkQueueHandle, &pcm_chunk_message, pdMS_TO_TICKS(5)) != pdPASS ) { + ESP_LOGW(TAG, "flac_decoder_write_cb: Failed to post the message"); + + free(pcm_chunk_message->payload); + free(pcm_chunk_message); + + ret = AEL_IO_FAIL; + } + } + else { + ESP_LOGW(TAG, "flac_decoder_write_cb: failed to get timestamp for it"); + + ret = AEL_IO_FAIL; + } + + return ret; +} + /** * */ -void app_main(void) -{ - audio_pipeline_handle_t pipeline; - audio_element_handle_t raw_stream_reader, i2s_stream_writer, decoder; +void app_main(void) { + audio_pipeline_handle_t flacDecodePipeline; + audio_element_handle_t raw_stream_writer_to_decoder, decoder; + audio_pipeline_handle_t playbackPipeline; + audio_element_handle_t raw_stream_writer_to_i2s, i2s_stream_writer; esp_err_t ret; uint8_t base_mac[6]; + http_task_cfg_t httpTaskCfg = {NULL, NULL}; ret = nvs_flash_init(); if (ret == ESP_ERR_NVS_NO_FREE_PAGES || ret == ESP_ERR_NVS_NEW_VERSION_FOUND) { @@ -1135,24 +1187,6 @@ void app_main(void) } ESP_ERROR_CHECK(ret); - -// tcpip_adapter_init(); -// ESP_ERROR_CHECK( esp_event_loop_init(event_handler, NULL) ); -// wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT(); -// ESP_ERROR_CHECK( esp_wifi_init(&cfg) ); -// ESP_ERROR_CHECK( esp_wifi_set_storage(WIFI_STORAGE_RAM) ); -// ESP_ERROR_CHECK( esp_wifi_set_mode(WIFI_MODE_STA) ); -// wifi_config_t sta_config = { -// .sta = { -// .ssid = CONFIG_ESP_WIFI_SSID, -// .password = CONFIG_ESP_WIFI_PASSWORD, -// .bssid_set = false -// } -// }; -// ESP_ERROR_CHECK( esp_wifi_set_config(WIFI_IF_STA, &sta_config) ); -// ESP_ERROR_CHECK( esp_wifi_start() ); -// ESP_ERROR_CHECK( esp_wifi_connect() ); - wifi_init_sta(); // Get MAC address for WiFi station @@ -1163,66 +1197,86 @@ void app_main(void) esp_log_level_set("*", ESP_LOG_WARN); esp_log_level_set(TAG, ESP_LOG_INFO); - ESP_LOGI(TAG, "[ 2 ] Start codec chip"); + ESP_LOGI(TAG, "Start codec chip"); board_handle = audio_board_init(); audio_hal_ctrl_codec(board_handle->audio_hal, AUDIO_HAL_CODEC_MODE_DECODE, AUDIO_HAL_CTRL_START); - ESP_LOGI(TAG, "[3.0] Create audio pipeline for playback"); - audio_pipeline_cfg_t pipeline_cfg = DEFAULT_AUDIO_PIPELINE_CONFIG(); - //pipeline_cfg.rb_size = 1024; - pipeline = audio_pipeline_init(&pipeline_cfg); - AUDIO_NULL_CHECK(TAG, pipeline, return); + ESP_LOGI(TAG, "Create audio pipeline for decoding"); + audio_pipeline_cfg_t flac_dec_pipeline_cfg = DEFAULT_AUDIO_PIPELINE_CONFIG(); + flacDecodePipeline = audio_pipeline_init(&flac_dec_pipeline_cfg); + AUDIO_NULL_CHECK(TAG, flacDecodePipeline, return); - ESP_LOGI(TAG, "[3.1] Create raw stream to read data from snapcast"); - raw_stream_cfg_t raw_cfg = RAW_STREAM_CFG_DEFAULT(); - raw_cfg.type = AUDIO_STREAM_WRITER; - //raw_cfg.out_rb_size = 1024; - raw_stream_reader = raw_stream_init(&raw_cfg); + ESP_LOGI(TAG, "Create raw stream to write data from snapserver to decoder"); + raw_stream_cfg_t raw_1_cfg = RAW_STREAM_CFG_DEFAULT(); + raw_1_cfg.type = AUDIO_STREAM_WRITER; + raw_1_cfg.out_rb_size = 8 * 4096; // TODO: how much is really needed? + raw_stream_writer_to_decoder = raw_stream_init(&raw_1_cfg); - ESP_LOGI(TAG, "[3.2] Create i2s stream to write data to codec chip"); - i2s_stream_cfg_t i2s_cfg = I2S_STREAM_CFG_DEFAULT(); - //i2s_cfg.task_stack = I2S_STREAM_TASK_STACK * 2; - i2s_cfg.i2s_config.sample_rate = 48000; - //i2s_cfg.i2s_config.dma_buf_count = 8; - //i2s_cfg.i2s_config.dma_buf_len = 480; - //i2s_cfg.out_rb_size = 1024; - i2s_cfg.task_core = I2S_TASK_CORE_ID; - i2s_cfg.task_prio = I2S_TASK_PRIORITY; - i2s_stream_writer = i2s_stream_init(&i2s_cfg); - - ESP_LOGI(TAG, "[2.1] Create flac decoder to decode flac file and set custom read callback"); + ESP_LOGI(TAG, "Create flac decoder to decode flac file and set custom write callback"); flac_decoder_cfg_t flac_cfg = DEFAULT_FLAC_DECODER_CONFIG(); - //flac_cfg.out_rb_size = 1024; decoder = flac_decoder_init(&flac_cfg); + audio_element_set_write_cb(decoder, flac_decoder_write_cb, NULL); - ESP_LOGI(TAG, "[3.4] Register all elements to audio pipeline"); - audio_pipeline_register(pipeline, raw_stream_reader, "raw"); - audio_pipeline_register(pipeline, decoder, "decoder"); - audio_pipeline_register(pipeline, i2s_stream_writer, "i2s"); + ESP_LOGI(TAG, "Register all elements to audio pipeline"); + audio_pipeline_register(flacDecodePipeline, raw_stream_writer_to_decoder, "raw_1"); + audio_pipeline_register(flacDecodePipeline, decoder, "decoder"); - ESP_LOGI(TAG, "[3.5] Link it together [flash]-->raw-->decoder-->i2s_stream-->[codec_chip]"); - const char *link_tag[3] = {"raw", "decoder", "i2s"}; - audio_pipeline_link(pipeline, &link_tag[0], 3); + ESP_LOGI(TAG, "Link it together [snapclient]-->raw_1-->decoder"); + const char *link_tag[2] = {"raw_1", "decoder"}; + audio_pipeline_link(flacDecodePipeline, &link_tag[0], 2); - ESP_LOGI(TAG, "[ 4 ] Set up event listener"); + ESP_LOGI(TAG, "Create audio pipeline for playback"); + audio_pipeline_cfg_t playback_pipeline_cfg = DEFAULT_AUDIO_PIPELINE_CONFIG(); + playbackPipeline = audio_pipeline_init(&playback_pipeline_cfg); + AUDIO_NULL_CHECK(TAG, playbackPipeline, return); + + ESP_LOGI(TAG, "Create raw stream to write data from decoder to i2s"); + raw_stream_cfg_t raw_2_cfg = RAW_STREAM_CFG_DEFAULT(); + raw_2_cfg.type = AUDIO_STREAM_WRITER; + raw_2_cfg.out_rb_size = 16 * 4096; // TODO: how much is really needed? + raw_stream_writer_to_i2s = raw_stream_init(&raw_2_cfg); + + ESP_LOGI(TAG, "Create i2s stream to write data to codec chip"); + i2s_stream_cfg_t i2s_cfg = I2S_STREAM_CFG_DEFAULT(); + //i2s_cfg.task_stack = I2S_STREAM_TASK_STACK * 2; + i2s_cfg.i2s_config.sample_rate = 48000; + //i2s_cfg.i2s_config.dma_buf_count = 8; + //i2s_cfg.i2s_config.dma_buf_len = 480; + //i2s_cfg.out_rb_size = 1024; + i2s_cfg.task_core = I2S_TASK_CORE_ID; + //i2s_cfg.task_prio = I2S_TASK_PRIORITY; + i2s_stream_writer = i2s_stream_init(&i2s_cfg); + + audio_pipeline_register(playbackPipeline, raw_stream_writer_to_i2s, "raw_2"); + audio_pipeline_register(playbackPipeline, i2s_stream_writer, "i2s"); + + ESP_LOGI(TAG, "Link it together [sync task]-->raw_2-->i2s_stream-->[codec_chip]"); + const char *link_tag_2[2] = {"raw_2", "i2s"}; + audio_pipeline_link(playbackPipeline, &link_tag_2[0], 2); + + ESP_LOGI(TAG, "Set up event listener"); audio_event_iface_cfg_t evt_cfg = AUDIO_EVENT_IFACE_DEFAULT_CFG(); audio_event_iface_handle_t evt = audio_event_iface_init(&evt_cfg); - ESP_LOGI(TAG, "[4.1] Listening event from all elements of pipeline"); - audio_pipeline_set_listener(pipeline, evt); + ESP_LOGI(TAG, "Listening event from all elements of pipelines"); + audio_pipeline_set_listener(flacDecodePipeline, evt); + audio_pipeline_set_listener(playbackPipeline, evt); - ESP_LOGI(TAG, "[ 5 ] Start audio_pipeline"); - audio_pipeline_run(pipeline); + ESP_LOGI(TAG, "Start audio_pipelines"); + audio_pipeline_run(flacDecodePipeline); + audio_pipeline_run(playbackPipeline); + + ESP_LOGI(TAG, "Listen for all pipeline events"); - ESP_LOGI(TAG, "[ 6 ] Listen for all pipeline events"); - // syncing to sntp #if CONFIG_USE_SNTP == 1 + // syncing to sntp vTaskDelay(5000/portTICK_PERIOD_MS); - ESP_LOGI(TAG, "[ 7 ] Syncing to sntp"); + ESP_LOGI(TAG, "Syncing to sntp"); set_time_from_sntp(); #else { + // don't use sntp, if server and client are too different, we get overflowing timevals struct timeval tv = { .tv_sec = 0, .tv_usec = 0, @@ -1237,16 +1291,28 @@ void app_main(void) nowtm = localtime(&nowtime); strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm); sprintf(buf, "%s.%06ld", tmbuf, tv.tv_usec); - ESP_LOGI(TAG, "[ 7 ] Current time is %s", buf); + ESP_LOGI(TAG, "Current time is %s", buf); } #endif - ESP_LOGI(TAG, "[ 8 ] Start snapclient task"); - xTaskCreatePinnedToCore(&http_get_task, "http_get_task", 4*4096, &raw_stream_reader, HTTP_TASK_PRIORITY, NULL, HTTP_TASK_CORE_ID); + ESP_LOGI(TAG, "Start snapclient task"); + + timestampQueueHandle = xQueueCreateStatic( TIMESTAMP_QUEUE_LENGTH, + sizeof(tv_t), + timestampQueueStorageArea, + ×tampQueue + ); + + httpTaskCfg.p_raw_stream_writer_to_decoder = &raw_stream_writer_to_decoder; + httpTaskCfg.p_raw_stream_writer_to_i2s = &raw_stream_writer_to_i2s; + xTaskCreatePinnedToCore(&http_get_task, "http_get_task", 4*4096, &httpTaskCfg, HTTP_TASK_PRIORITY, NULL, HTTP_TASK_CORE_ID); while (1) { audio_event_iface_msg_t msg; - esp_err_t ret = audio_event_iface_listen(evt, &msg, portMAX_DELAY); + esp_err_t ret; + + // listen to events + ret = audio_event_iface_listen(evt, &msg, portMAX_DELAY); if (ret != ESP_OK) { ESP_LOGE(TAG, "[ * ] Event interface error : %d", ret); continue; @@ -1268,39 +1334,50 @@ void app_main(void) audio_element_setinfo(i2s_stream_writer, &music_info); i2s_stream_set_clk(i2s_stream_writer, music_info.sample_rates, music_info.bits, music_info.channels); + continue; } /* Stop when the last pipeline element (i2s_stream_writer in this case) receives stop event */ if (msg.source_type == AUDIO_ELEMENT_TYPE_ELEMENT && msg.source == (void *) i2s_stream_writer && msg.cmd == AEL_MSG_CMD_REPORT_STATUS - && (((int)msg.data == AEL_STATUS_STATE_STOPPED) || ((int)msg.data == AEL_STATUS_STATE_FINISHED))) { + && (((int)msg.data == AEL_STATUS_STATE_STOPPED) || ((int)msg.data == AEL_STATUS_STATE_FINISHED))) + { ESP_LOGW(TAG, "[ * ] Stop event received"); break; } } - ESP_LOGI(TAG, "[ 7 ] Stop audio_pipeline"); - audio_pipeline_stop(pipeline); - audio_pipeline_wait_for_stop(pipeline); - audio_pipeline_terminate(pipeline); + ESP_LOGI(TAG, "Stop audio_pipeline"); + audio_pipeline_stop(flacDecodePipeline); + audio_pipeline_wait_for_stop(flacDecodePipeline); + audio_pipeline_terminate(flacDecodePipeline); + audio_pipeline_stop(playbackPipeline); + audio_pipeline_wait_for_stop(playbackPipeline); + audio_pipeline_terminate(playbackPipeline); - //audio_pipeline_unregister(pipeline, spiffs_stream_reader); - audio_pipeline_unregister(pipeline, raw_stream_reader); - audio_pipeline_unregister(pipeline, i2s_stream_writer); - audio_pipeline_unregister(pipeline, decoder); + audio_pipeline_unregister(flacDecodePipeline, raw_stream_writer_to_decoder); + audio_pipeline_unregister(flacDecodePipeline, decoder); + audio_pipeline_unregister(playbackPipeline, raw_stream_writer_to_i2s); + audio_pipeline_unregister(playbackPipeline, i2s_stream_writer); /* Terminal the pipeline before removing the listener */ - audio_pipeline_remove_listener(pipeline); + audio_pipeline_remove_listener(flacDecodePipeline); + audio_pipeline_remove_listener(playbackPipeline); /* Make sure audio_pipeline_remove_listener & audio_event_iface_remove_listener are called before destroying event_iface */ audio_event_iface_destroy(evt); /* Release all resources */ - audio_pipeline_deinit(pipeline); - audio_element_deinit(raw_stream_reader); - audio_element_deinit(i2s_stream_writer); + audio_pipeline_deinit(flacDecodePipeline); + audio_element_deinit(raw_stream_writer_to_decoder); audio_element_deinit(decoder); + + audio_pipeline_deinit(playbackPipeline); + audio_element_deinit(raw_stream_writer_to_i2s); + audio_element_deinit(i2s_stream_writer); + + // TODO: clean up all created tasks and delete them } diff --git a/sdkconfig.old b/sdkconfig.old index 4f0f79e..ce33cf0 100644 --- a/sdkconfig.old +++ b/sdkconfig.old @@ -121,8 +121,8 @@ CONFIG_PARTITION_TABLE_MD5=y # # Access Point configuration # -CONFIG_ESP_WIFI_SSID="zuhause" -CONFIG_ESP_WIFI_PASSWORD="dErtischlEr" +CONFIG_ESP_WIFI_SSID="" +CONFIG_ESP_WIFI_PASSWORD="" # end of Access Point configuration # @@ -419,7 +419,7 @@ CONFIG_SPIRAM_BOOT_INIT=y # CONFIG_SPIRAM_USE_MEMMAP is not set # CONFIG_SPIRAM_USE_CAPS_ALLOC is not set CONFIG_SPIRAM_USE_MALLOC=y -CONFIG_SPIRAM_MEMTEST=y +# CONFIG_SPIRAM_MEMTEST is not set CONFIG_SPIRAM_MALLOC_ALWAYSINTERNAL=16384 # CONFIG_SPIRAM_TRY_ALLOCATE_WIFI_LWIP is not set CONFIG_SPIRAM_MALLOC_RESERVE_INTERNAL=32768