import { Subject } from 'rxjs'
import {
    getFromMapOrThrow,
    PROMISE_RESOLVE_VOID,
    randomCouchString,
} from 'rxdb'

import type {
    P2PConnectionHandler,
    P2PConnectionHandlerCreator,
    P2PMessage,
    P2PPeer,
    PeerWithMessage,
    PeerWithResponse,
} from 'rxdb/dist/types/plugins/replication-p2p/p2p-types'

import { Instance as SimplePeer, default as Peer } from 'simple-peer'
import { newRxError, RxError, RxTypeError } from 'rxdb'

/**
 * Returns a connection handler that uses simple-peer and the signaling server.
 */
export function getConnectionHandlerDatosDriveP2P(
    serverUrl: string,
    wrtc?: any,
): P2PConnectionHandlerCreator {
    const creator: P2PConnectionHandlerCreator = (options: any) => {
        const ws = new WebSocket(serverUrl)
        const connect$ = new Subject<P2PPeer>()
        const disconnect$ = new Subject<P2PPeer>()
        const message$ = new Subject<PeerWithMessage>()
        const response$ = new Subject<PeerWithResponse>()
        const error$ = new Subject<RxError | RxTypeError>()
        const peers = new Map<string, SimplePeer>()
        let heartbeat_interval: any = null;

        ws.onopen = async () => {
            const peerId = randomCouchString(10)
            ws.send(
                JSON.stringify({
                    type: 'join',
                    room: options.topic,
                    peerId,
                }),
            )
            const heartbeat = () => {
                console.log("p2p", "heartbeat", peerId)
                ws.send(
                    JSON.stringify({
                        type: 'heartbeat',
                        room: options.topic,
                        peerId,
                    }),
                )
            }

            heartbeat();
            heartbeat_interval = setInterval(() => {
                heartbeat();
            }, 30000) // 30 Seconds

            ws.addEventListener('message', (event: MessageEvent) => {
                const data = JSON.parse(event.data)
                console.log("p2p", "message received", data);
                const { type } = data
                switch (type) {
                    case 'joined':
                        data.roomPeerIds.forEach((remotePeerId: any) => {
                            if (
                                remotePeerId === peerId ||
                                peers.has(remotePeerId)
                            ) {
                                return
                            }
                            console.log("p2p", 'other user joined room ' + remotePeerId);
                            const newPeer: SimplePeer = new Peer({
                                initiator: remotePeerId > peerId,
                                wrtc,
                                trickle: true,
                            }) as any
                            peers.set(remotePeerId, newPeer)

                            newPeer.on('data', (messageOrResponse: any) => {
                                messageOrResponse = JSON.parse(
                                    messageOrResponse.toString(),
                                )
                                console.log('p2p', 'got a message from peer3: ' + messageOrResponse)
                                if (messageOrResponse.result) {
                                    response$.next({
                                        peer: newPeer as any,
                                        response: messageOrResponse,
                                    })
                                } else {
                                    message$.next({
                                        peer: newPeer as any,
                                        message: messageOrResponse,
                                    })
                                }
                            })

                            newPeer.on('signal', (signal: any) => {
                                console.log('p2p', 'emit signal from ' + peerId + ' to ' + remotePeerId);
                                ws.send(
                                    JSON.stringify({
                                        type: 'signal',
                                        from: peerId,
                                        to: remotePeerId,
                                        room: options.topic,
                                        signal,
                                    }),
                                )
                            })

                            newPeer.on('error', (error) => {
                                console.log('p2p', 'error from peer: ' + error)
                                error$.next(
                                    newRxError('RC_P2P_PEER', {
                                        error,
                                    }),
                                )
                            })

                            newPeer.on('connect', () => {
                                console.log('p2p', 'connect from peer')
                                connect$.next(newPeer as any)
                            })
                        })
                        break
                    case 'left': {
                        console.log('p2p', 'other user left room ' + data.peerId);
                        const peer = peers.get(data.peerId)
                        if (peer) {
                            peer.destroy()
                            peers.delete(data.peerId)
                            disconnect$.next(peer as any)
                        }
                        break
                    }
                    case 'signal':
                        console.log('p2p','got signal(' + peerId + ') ' + data.from + ' -> ' + data.to);
                        // eslint-disable-next-line no-case-declarations
                        const peer = getFromMapOrThrow(peers, data.from)
                        peer.signal(data.signal)
                }
            })
        }

        const handler: P2PConnectionHandler = {
            error$,
            connect$,
            disconnect$,
            message$,
            response$,
            async send(peer: P2PPeer, message: P2PMessage) {
                await (peer as any).send(JSON.stringify(message))
            },
            destroy() {
                if (heartbeat_interval) {
                    clearInterval(heartbeat_interval);
                }
                ws.close()
                error$.complete()
                connect$.complete()
                disconnect$.complete()
                message$.complete()
                response$.complete()
                return PROMISE_RESOLVE_VOID
            },
        }
        return handler
    }
    return creator
}
