package co.id.datacomsolusindo.ipphonebridge import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.apache.commons.io.IOUtils import org.springframework.core.io.ByteArrayResource import org.springframework.core.io.Resource import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.HttpMethod import org.springframework.http.HttpStatus import org.springframework.messaging.converter.MappingJackson2MessageConverter import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.stomp.StompCommand import org.springframework.messaging.simp.stomp.StompHeaders import org.springframework.messaging.simp.stomp.StompSession import org.springframework.messaging.simp.stomp.StompSessionHandler import org.springframework.util.LinkedMultiValueMap import org.springframework.util.MultiValueMap import org.springframework.web.client.HttpStatusCodeException import org.springframework.web.client.RestTemplate import org.springframework.web.socket.client.standard.StandardWebSocketClient import org.springframework.web.socket.messaging.WebSocketStompClient import java.io.Serializable import java.lang.reflect.Type import java.net.URI import java.nio.charset.Charset import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap class GatewayClient( val socketHost: String, val clientName: String, val currentLocalPort: Int, val socketPort: Int, val srvTemplate: SimpMessagingTemplate ) { enum class State { INITIALIZED, CONNECTING, FAILED, CONNECT } var state = State.INITIALIZED var session: StompSession? = null fun connect() { try { state = State.CONNECTING val client = StandardWebSocketClient() val stompClient = WebSocketStompClient(client) stompClient.messageConverter = MappingJackson2MessageConverter() val sessionHandler = BridgeStompSessionHandler(srvTemplate) AppLog.write(this.javaClass).info("Try connect to : ws://$socketHost:$socketPort/_websocket") val cn = stompClient.connect("ws://$socketHost:$socketPort/_websocket", sessionHandler) cn.addCallback({ s -> session = s if (s != null && s.isConnected) { AppLog.write(this.javaClass).info("Socket is connected.") state = State.CONNECT } }, { e -> AppLog.write(this.javaClass).info("Can't connect to socket server.") AppLog.write(this.javaClass).error(e.message, e) state = State.FAILED }) session = sessionHandler.session } catch (e: Exception) { AppLog.write(this.javaClass) .info("Can't connect to socket server, check your configuration and log. retry in 5 second") AppLog.write(this.javaClass).error(e.message, e) state = State.FAILED } } } class BridgeStompSessionHandler(val srvTemplate: SimpMessagingTemplate) : StompSessionHandler { var session: StompSession? = null // val partialDataMap: ConcurrentMap> = ConcurrentHashMap() // val partialDataMap: ConcurrentMap> = ConcurrentHashMap() override fun handleException( session: StompSession, command: StompCommand?, headers: StompHeaders, btar: ByteArray, error: Throwable ) { AppLog.write(this.javaClass).info("Socket Handle Error") AppLog.write(this.javaClass).error("Socket Handle Error", error) } override fun handleTransportError(session: StompSession, error: Throwable) { AppLog.write(this.javaClass).info("Socket Transport Error") AppLog.write(this.javaClass).error("Socket Transport Error", error) } override fun getPayloadType(headers: StompHeaders): Type { return Any::class.java } override fun afterConnected(ss: StompSession, sh: StompHeaders) { AppLog.write(this.javaClass).info("Connected to gateway socket server") // ss.subscribe("/topic/partial/$clientName", this) // ss.subscribe("/topic/healthCheck", this) this.session = ss } override fun handleFrame(stompHeaders: StompHeaders, payload: Any?) { // "/topic/incoming/clnum" if (payload != null) { val reqId = Singletons.requestInstance.entries.map { it.key } .firstOrNull { stompHeaders.destination?.endsWith(it) ?: false } ?: "" val clientDest = ClientHolder.get().map { it.key }.firstOrNull { stompHeaders.destination?.endsWith(it) ?: false } ?: "" // when (stompHeaders.destination) { "/topic/partial/$clientDest" -> { srvTemplate.convertAndSend( "/topic/partial/$clientDest", payload ) } "/topic/response/$reqId" -> { SocketChecker().collectChunk(payload as SocketChunkData,reqId) } } } // when (stompHeaders.destination) { // "/topic/healthCheck" -> { // if ((1..100).random() == 5) { // val msgObj = payload as LinkedHashMap<*, *> // AppLog.write(javaClass).info("Socket Health Check " + msgObj["Hi"]) // } // } // "/topic/request/$clientName" -> { // //process // this.session!!.send("/app/confirm/req", "OK") // } // // "/topic/partial/$clientName" -> { // val msgObj = payload as LinkedHashMap<*, *> // val partData = PartialData( // msgObj["id"] as String, // msgObj["total"] as Int, // msgObj["idx"] as Int, // msgObj["data"] as String, // msgObj["startTime"] as Long // ) // // val pt = partialDataMap.getOrPut(payload["id"] as String) { // ConcurrentHashMap() // } // pt.getOrPut(partData.idx){partData} // val listChunk = pt.entries.sortedBy { it.key }.map { it.value } // if (listChunk.isNotEmpty() && listChunk.size == listChunk[0].total) { // val rbJs = listChunk.joinToString("") { it.data } // // val mapper = jacksonObjectMapper() // val rb = mapper.readValue(rbJs, RequestBuilder::class.java) // requestBuilderProcessor(rb, listChunk[0].startTime) // } // // } // else -> { // AppLog.write(this.javaClass) // .info("unknown message from ${stompHeaders.destination} with message $payload") // } // // } } // fun requestBuilderProcessor(rb: RequestBuilder, startTime: Long) { // var intStartTime = startTime // val headers: MultiValueMap = LinkedMultiValueMap() // rb.headers.entries.forEach { headers[it.key] = mutableListOf(it.value) } // var url = "http://localhost:" + currentLocalPort + rb.path // AppLog.write(this.javaClass).info("-----------request builder -----------") // AppLog.write(this.javaClass).info(rb.body) // AppLog.write(this.javaClass).info(rb.headers) // AppLog.write(this.javaClass).info(rb.parts) // AppLog.write(this.javaClass).info(rb.queryString) // AppLog.write(this.javaClass).info("-----------end builder -----------") // // val queryString = rb.queryString // // if (queryString != null) { // url = "$url?$queryString" // } // // val urlSend = URI(url) // AppLog.write(this.javaClass).info("socket -> url to access $url") // // val httpEnt = rb.parts?.let { // val bodyPart: MultiValueMap = LinkedMultiValueMap() // it.forEach { fp -> // bodyPart.add( // fp.name, // FileNameAwareByteArrayResource(fp.fileName, Base64.getDecoder().decode(fp.data), null) // ) // } // HttpEntity(bodyPart, headers) // } ?: HttpEntity(rb.body?.replace("\r\n", "\n")?.replace("\r", "\n")?.replace("\n", "\r\n"), headers) // // val d1 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds. // // AppLog.write(this.javaClass).info("Socket prc preparation. Time: $d1, ID: ${rb.id}") // intStartTime = System.nanoTime() // // var hd: HttpHeaders? = null // var stat: Int // var bdz: ByteArray? // // try { // val response = restTemplate.exchange(urlSend, rb.method, httpEnt, Resource::class.java) // bdz = if (response.body != null) { // IOUtils.toByteArray(response.body!!.inputStream) // } else { // ByteArray(0) // } // hd = response.headers // stat = response.statusCode.value() // } catch (httpError: HttpStatusCodeException) { // AppLog.write(this.javaClass).info("http error + ${httpError.message}") // AppLog.write(this.javaClass).error("http error", httpError) // bdz = httpError.responseBodyAsByteArray // hd = httpError.responseHeaders // stat = httpError.statusCode.value() // // } // catch (ex: Exception) { // AppLog.write(this.javaClass).info("request error") // AppLog.write(this.javaClass).error("request error", ex) // stat = HttpStatus.BAD_REQUEST.ordinal // bdz = "unknown error".toByteArray(Charset.defaultCharset()) // } // // val d2 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds. // AppLog.write(this.javaClass).info("Socket loop internal prc. Time: $d2, ID: ${rb.id}") // intStartTime = System.nanoTime() // // this.session?.let { // val chunkSize = 2000 // if (bdz != null && bdz.size > chunkSize) { // val chunk = bdz.toList().chunked(chunkSize).map { bt -> bt.toByteArray() } // chunk.forEachIndexed { idx, bt -> // if (idx < chunk.size - 1) { // it.send("/app/response/${rb.id}", SocketChunkData(bt, null, stat, idx, chunk.size)) // } else { // it.send("/app/response/${rb.id}", SocketChunkData(bt, hd, stat, idx, chunk.size))//send // } // } // } else { // //send // it.send("/app/response/${rb.id}", SocketChunkData(bdz, hd, stat, 1, 1)) // } // } // val d3 = (System.nanoTime() - intStartTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds. // AppLog.write(this.javaClass).info("Socket send response. Time: $d3, ID: ${rb.id}") // val duration = (System.nanoTime() - startTime).toDouble() / 1000000 //divide by 1000000 to get milliseconds. // AppLog.write(this.javaClass).info("Bridge total process Success. Time: $duration, ID: ${rb.id}, URL: $urlSend") // // } // // class FileNameAwareByteArrayResource(private val fName: String, byteArray: ByteArray, description: String?) : // ByteArrayResource(byteArray, description) { // override fun getFilename() = fName // } // // class PartialData(val id: String, val total: Int, val idx: Int, val data: String, val startTime: Long) // // class RequestBuilder( // val id: String, // val path: String, // var method: HttpMethod, // var headers: MutableMap // ) : Serializable { // var body: String? = null // var queryString: String? = null // var parts: MutableList? = null // } // // class FilePart(val name: String, val fileName: String, val data: String) // // class SocketChunkData( // val body: ByteArray?, // val header: HttpHeaders?, // val status: Int, // val part: Int, // val totalPart: Int // ) // val messageMap: ConcurrentMap> = ConcurrentHashMap() }