// streamMultiAssetData.js

import { initializeDB } from "./db";

const WS_BASE_URL = `wss://ws.twelvedata.com/v1/quotes/price?apikey=${process.env.REACT_APP_TWELVE_API_KEY}`;
const RECONNECT_INTERVAL = 5000;

let db;
let webSockets = {}; // Store WebSocket for each asset type
let shouldReconnect = true; // Flag to control whether to reconnect

export const streamData = async () => {
    shouldReconnect = true; // Ensure reconnection is enabled when starting the stream
    db = await initializeDB(); // Initialize the database
    const assetsByType = await fetchAssetsByType(); // Fetch assets grouped by type

    for (const assetType in assetsByType) {
        setupWebSocketConnection(assetType, assetsByType[assetType]); // Setup WebSocket connections for each asset type
    }
};

async function fetchAssetsByType() {
    try {
        const tx = db.transaction('assets', 'readonly');
        const store = tx.objectStore('assets');
        const assets = await store.getAll();
        await tx.done;

        const assetsByType = { 'I': [], 'F': [], 'C': [] };
        assets.forEach(asset => {
            assetsByType[asset.assettype].push(asset.symbol);
        });
        return assetsByType;
    } catch (error) {
        console.error('Error fetching assets:', error);
        return { 'I': [], 'F': [], 'C': [] };
    }
}

function setupWebSocketConnection(assetType, symbols) {
    if (!shouldReconnect || symbols.length === 0) {
        return;
    }

    const wsURL = WS_BASE_URL;
    const socket = new WebSocket(wsURL);

    socket.onopen = () => {
        console.log(`WebSocket connected for asset type: ${assetType}`);

        // Concatenate symbols into a comma-delimited string
        const symbolsStr = symbols.join(",");

        // Send subscription message
        const subscribeMessage = {
            action: 'subscribe',
            params: { symbols: symbolsStr }
        };
        socket.send(JSON.stringify(subscribeMessage));
    };

    socket.onmessage = async (event) => {
        try {
            //console.log(`Data received for asset type: ${assetType}`, event.data);  // Log raw data
            const data = JSON.parse(event.data);
            await processData(data);
        } catch (error) {
            console.error('Error in onmessage:', error);  // Log any parsing or processing errors
        }
    };

    socket.onclose = () => {
        console.log(`WebSocket disconnected for asset type: ${assetType}, attempting to reconnect...`);
        if (shouldReconnect) {
            setTimeout(() => setupWebSocketConnection(assetType, symbols), RECONNECT_INTERVAL);
        }
    };

    socket.onerror = (error) => {
        console.error(`WebSocket error for asset type: ${assetType}:`, error);
        socket.close();
    };

    webSockets[assetType] = socket;
}


let aggregatedData = {};

async function processData(data) {
    try {
        if (data.event !== 'price') {
            return; // Ignore non-price events
        }

        if (!data.symbol || !data.timestamp || !data.price) {
            throw new Error('Missing required data fields');
        }

        const { symbol, timestamp, price } = data;
        console.log("stream",symbol, timestamp, price);
        const formattedTimestamp = new Date(timestamp * 1000);
        const minuteKey = formattedTimestamp.toISOString().slice(0, 16); // YYYY-MM-DDTHH:MM

        if (!aggregatedData[symbol]) {
            aggregatedData[symbol] = {};
        }

        // Check if a new minute has started
        if (!aggregatedData[symbol][minuteKey]) {
            // Save previous minute's data asynchronously without waiting
            for (let key in aggregatedData[symbol]) {
                if (key !== minuteKey) {
                    saveMinuteData(symbol, key).catch(error => console.error('Error saving data:', error));
                }
            }

            // Initialize data for the new minute
            aggregatedData[symbol][minuteKey] = {
                open: price,
                close: price,
                high: price,
                low: price,
            };
        } else {
            // Update data for the current minute
            const currentData = aggregatedData[symbol][minuteKey];
            currentData.close = price;
            currentData.high = Math.max(currentData.high, price);
            currentData.low = Math.min(currentData.low, price);
        }
    } catch (error) {
        console.error('Error processing data:', error);
    }
}

async function saveMinuteData(symbol, minuteKey) {
    try {
        const data = aggregatedData[symbol][minuteKey];
        const tableName = `${symbol.replace('/', '_')}-1min`;
        const tx = db.transaction(tableName, 'readwrite');
        const store = tx.objectStore(tableName);
        const minuteData = {
            symbol: symbol,
            interval: '1min',
            date: minuteKey,
            open: data.open,
            close: data.close,
            high: data.high,
            low: data.low,
            volume: '0',
            ema10: '0',
            ema20: '0',
            ema50: '0',
            ema100: '0',
            ema200: '0',
            atr: '0',
            major: '0',
            minor: '0',
        };


        await store.put(minuteData);
        await tx.done;

        // Now that the data has been successfully saved, it's safe to delete it
        delete aggregatedData[symbol][minuteKey];
    } catch (error) {
        console.error('Error saving minute data:', error);
    }
}
export const stopStreamData = () => {
    shouldReconnect = false; // Disable reconnection when stopping the stream
    for (const assetType in webSockets) {
        if (webSockets[assetType] && webSockets[assetType].readyState !== WebSocket.CLOSED) {
            webSockets[assetType].close();
        }
    }
    webSockets = {}; // Clear the webSockets object
};
