فهرست منبع

new service migration

masarif 2 هفته پیش
والد
کامیت
9c8e656934

+ 1 - 10
config/general-setting.yml

@@ -182,13 +182,4 @@ redirectUrl:
 - http://localhost:4200/oauth2redirect
 - https://cmpd.telmessenger.com/oauth2redirect
 - https://app.insomnia.rest/oauth/redirect
-dataKey: aY6tKotSoUoMIi7JiTBZq8x9iwvZyb%2FfASKiQ6z66AJhzPqgjwKJeLHmtQRAz0zL2BCoDrKJl%2BezyyIV72h6NqZL0Lh7m42Nlf2mJKr3SfA%2FdVMpGEtrqllUtT5idZ%2BR
-
-#database: 
-#  type: sqlserver
-#  host: 127.0.0.1
-#  port: 1433
-#  name: dbname
-#  username: sa
-#  password: password
-#  properties: encrypt=true;trustServerCertificate=true;
+dataKey: RMyvjTZXDMBnCr3jymw9oHGUUzIJTHCULv9x%2B6xE4De56nu2OlXjkTWuk5%2B3QLfyE5PJQUBl9SIAuZOGmpIJBTHvNBFOUN%2BtwT0FzDUIKfD%2BdOcU4lJzNzo9mc2cMilw

+ 1 - 0
config/migration.yml

@@ -126,6 +126,7 @@ migration:
     table: calltransaction_20260127
     history: null
     group: null
+    unique: raw_data
     attribute:
       direction: direction
       startOfCall: start_of_call

+ 7 - 0
src/main/kotlin/com/datacomsolusindo/migration/General.kt

@@ -36,4 +36,11 @@ object General {
         }
     }
 
+    fun toTableName(migrationTarget: String): String {
+        return when (migrationTarget) {
+            "callTransaction" -> "calltransaction"
+            else -> migrationTarget.camelToSnake()
+        }
+    }
+
 }

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 11 - 9
src/main/kotlin/com/datacomsolusindo/migration/MigrationApplication.kt


+ 7 - 10
src/main/kotlin/com/datacomsolusindo/migration/MigrationEntity.kt

@@ -749,12 +749,6 @@ class QueryNativeService(
             }?.unique
         }
 
-        val fieldKey = fieldUnique?.split(";")?.mapNotNull { m -> mapData[m]?.toString() }?.joinToString(";")
-
-        val uidFromDb = temporaryDataEntity["${clazz.simpleName};$fieldKey"]?.get("uid")?.toString()
-        val uid = uidFromDb ?: ULID.random()
-        val isUpdate = uidFromDb != null
-
         val fields = mutableListOf("uid")
         val finalMap = mapData
             .filterNot { it.key == "cpid" }
@@ -764,6 +758,12 @@ class QueryNativeService(
             val t = it.replace("_id", "_uid")
             fields.add(t)
         }
+        val fieldKey = fieldUnique?.split(";")?.mapNotNull { m -> finalMap[m] }?.joinToString(";")
+
+        val uidFromDb = temporaryDataEntity["${clazz.simpleName};$fieldKey"]?.get("uid")?.toString()
+        val uid = uidFromDb ?: ULID.random()
+        val isUpdate = uidFromDb != null
+
         val structure = finalMap["parent_id"]?.toString()?.let {
             fields.add("structure")
             val parentUid = EntityUtility(
@@ -785,7 +785,7 @@ class QueryNativeService(
         val query = fieldUnique?.let { fu ->
             val uniqueOn = fu.split(";").joinToString(" AND ") { m -> "t.$m = s.$m" }
             """
-    MERGE INTO $tableName AS t
+    MERGE INTO $tableName WITH (HOLDLOCK) AS t
     USING (VALUES (${fields.joinToString() { ":$it" }})) 
         AS s(${fields.joinToString() { it.camelToSnake() }})
     ON $uniqueOn
@@ -799,9 +799,6 @@ class QueryNativeService(
         } ?: ("INSERT INTO $tableName (${fields.joinToString() { it.camelToSnake() }}) " +
                 "VALUES (${fields.joinToString() { ":$it" }})")
 
-//        val tempField = fields.joinToString() { it.camelToSnake() }
-//        val query = "INSERT INTO $tableName ($tempField) " +
-//                "VALUES (${fields.joinToString() { ":$it" }})"
         val sqlNative = apiService.em.createNativeQuery(query)
 
         if (clazz.simpleName.lowercase() != "transaction") {

+ 7 - 3
src/main/kotlin/com/datacomsolusindo/migration/MigrationService.kt

@@ -3,6 +3,9 @@ package com.datacomsolusindo.migration
 import com.datacomsolusindo.cpx_shared_code.entity.BaseEntity
 import com.datacomsolusindo.cpx_shared_code.utility.CacheData
 import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
+import com.datacomsolusindo.migration.model.MigrationFileData
+import com.datacomsolusindo.migration.service.executorInsertData
+import com.datacomsolusindo.migration.service.executorPrepareData
 import org.springframework.beans.factory.config.YamlPropertiesFactoryBean
 import org.springframework.core.io.FileSystemResource
 import org.springframework.stereotype.Service
@@ -23,10 +26,11 @@ import kotlin.time.measureTimedValue
 val temporaryDataEntity: MutableMap<String, Map<String, Any?>> = mutableMapOf()
 val temporaryDataByCode: MutableMap<String, String?> = mutableMapOf()
 val tempPassword: MutableMap<String, String?> = mutableMapOf()
-
-val executorPrepareData = Executors.newSingleThreadExecutor()
-val executorInsertData = Executors.newSingleThreadExecutor()
+//
+//val executorPrepareData = Executors.newSingleThreadExecutor()
+//val executorInsertData = Executors.newSingleThreadExecutor()
 val queueInsertData = LinkedBlockingQueue<Pair<Class<out BaseEntity>, List<MutableMap<String, Any?>>>>(20)
+val queueInsertMigrationData = LinkedBlockingQueue<Pair<MigrationFileData, List<MutableMap<String, Any?>>>>(20)
 
 @Service
 class MigrationService(

+ 796 - 0
src/main/kotlin/com/datacomsolusindo/migration/data/FinalizedDataService.kt

@@ -0,0 +1,796 @@
+package com.datacomsolusindo.migration.data
+
+import com.datacomsolusindo.cpx_shared_code.entity.*
+import com.datacomsolusindo.cpx_shared_code.service.ApiService
+import com.datacomsolusindo.cpx_shared_code.service.CpDecrypt
+import com.datacomsolusindo.cpx_shared_code.utility.*
+import com.datacomsolusindo.migration.*
+import com.datacomsolusindo.migration.model.MigrationFileData
+import io.azam.ulidj.ULID
+import io.github.semutkecil.simplecriteria.FilterData
+import org.springframework.security.crypto.password.PasswordEncoder
+import org.springframework.stereotype.Service
+import org.springframework.transaction.annotation.Transactional
+import java.time.LocalDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+import kotlin.math.log
+import kotlin.time.measureTimedValue
+
+@Service
+class FinalizedDataService(
+    private val cpDecrypt: CpDecrypt,
+    private val passwordEncoder: PasswordEncoder,
+    private val apiService: ApiService,
+    private val insertDataService: InsertDataService
+) {
+
+    private val logger = SimpleLogger.getLogger(this::class.java)
+
+    fun processData(
+        migrationFileData: MigrationFileData
+    ): List<MutableMap<String, Any?>> {
+        val mappedData = measureTimedValue {
+            val fields = migrationFileData.fields
+            val fieldMapping = fields.mapValues { it.value.substringAfterLast(".") }
+            val hasParent = fieldMapping.any { it.key == "parent_id" }
+            val uniqueField = fieldMapping["code"] ?: "id"
+            val uniqueFieldId = fieldMapping["id"]
+
+            val historyIndex = buildHistoryIndex(migrationFileData.historyData, uniqueField, uniqueFieldId)
+            val historyIndexById =
+                buildHistoryIndex(migrationFileData.historyData, uniqueField, uniqueFieldId, byId = true)
+            val groupIndex = buildGroupIndex(migrationFileData.groupData, uniqueFieldId ?: uniqueField)
+
+            val fieldRoots = fields.filterValues { !it.contains(".") }
+            val joinRoots = fields.filterValues { it.contains(".") }
+
+            migrationFileData.rootData!!
+//                .take(10)
+                .map { row ->
+                    buildRow(
+                        row,
+                        fieldRoots,
+                        joinRoots,
+                        historyIndex,
+                        historyIndexById,
+                        groupIndex,
+                        uniqueField,
+                        uniqueFieldId
+                    )
+                }.let { data ->
+                    if (hasParent) {
+                        data.sortedBy { it["structure"].toString().length }
+                    } else {
+                        data.map { postProcessPassword(it) }
+                    }
+                }
+        }
+
+        logger.info(
+            "finalized data service table ${migrationFileData.table} " +
+                    "takes time ${mappedData.duration.inWholeMilliseconds}ms " +
+                    "with all data ${mappedData.value.size}"
+        )
+
+        return mappedData.value
+    }
+
+    private fun buildHistoryIndex(
+        historyData: List<Map<String, Any?>>?,
+        uniqueField: String,
+        uniqueFieldId: String?,
+        byId: Boolean = false
+    ): Map<Any?, Map<String, Any?>>? {
+        if (historyData == null || uniqueFieldId == null) return null
+        return historyData
+            .groupBy { if (byId) it[uniqueFieldId] else it[uniqueField] }
+            .mapValues { (_, items) ->
+                items.maxByOrNull {
+                    it[uniqueFieldId.removePrefix("history.")]
+                        ?.toString()
+                        ?.toIntOrNull() ?: 0
+                } as Map<String, Any?>
+            }
+    }
+
+    private fun buildGroupIndex(groupData: List<Map<String, Any?>>?, key: String): Map<String?, Map<String, Any?>>? {
+        return groupData?.associateBy { it[key]?.toString() }
+    }
+
+    private fun buildRow(
+        row: Map<String, Any?>,
+        fieldRoots: Map<String, String>,
+        joinRoots: Map<String, String>,
+        historyIndex: Map<Any?, Map<String, Any?>>?,
+        historyIndexById: Map<Any?, Map<String, Any?>>?,
+        groupIndex: Map<String?, Map<String, Any?>>?,
+        uniqueField: String,
+        uniqueFieldId: String?
+    ): MutableMap<String, Any?> {
+        val data = mutableMapOf<String, Any?>()
+        // Direct fields
+        fieldRoots.forEach { (target, source) ->
+            data[target] = when {
+                target == "pbx__default" -> source
+                else -> row[source]
+            }
+        }
+        // Join fields
+        joinRoots.forEach { (target, sourceFull) ->
+            val value = when {
+                sourceFull.startsWith("history.") -> {
+                    val key = sourceFull.removePrefix("history.")
+                    val valueHistories = historyIndex
+                        ?.get(row[uniqueField])
+                        ?.get(key.substringAfterLast("."))
+                    valueHistories?.let { valHis ->
+                        if (target.contains("_")) {
+                            val codeKey = uniqueField.removePrefix("history.")
+                            val joinCode = historyIndexById?.get(valHis)?.get(codeKey)
+                            joinCode ?: valHis
+                        } else valHis
+                    }
+                }
+
+                sourceFull.startsWith("group.") -> {
+                    val key = sourceFull.removePrefix("group.")
+                    val idKey = historyIndex
+                        ?.get(row[uniqueField])
+                        ?.get(uniqueFieldId)
+                        ?: row[uniqueFieldId]
+                    groupIndex
+                        ?.get(idKey?.toString())
+                        ?.get(key.substringAfterLast("."))
+                }
+
+                else -> row[sourceFull]
+            }
+            data[target] = value
+        }
+        return data
+    }
+
+    private fun postProcessPassword(data: MutableMap<String, Any?>): MutableMap<String, Any?> {
+        val raw = data["password"]?.toString() ?: return data
+        data["password"] = when {
+            raw.isBlank() -> ""
+            else -> cpDecrypt.decrypt(raw)?.let { plain ->
+                tempPassword[plain] ?: passwordEncoder.encode(plain).also {
+                    tempPassword[plain] = it
+                }
+            } ?: ""
+        }
+        return data
+    }
+
+    fun finalizeMap(table: String, map: MutableMap<String, Any?>): MutableMap<String, Any?> {
+        val mapFinalize: MutableMap<String, Any?> = mutableMapOf()
+        map["code"]?.toString()?.let {
+            map["code"] = it.ifBlank { UUID.randomUUID().toString().take(6) }
+        }
+        map["name"]?.toString()?.let {
+            map["name"] = it.ifBlank {
+                "Auto ${(0..99999).random().toString().padStart(5, '0')}"
+            }
+        }
+
+        if (table == "phone_user" || table == "cost_center" || table == "organization") {
+            map["emailOnOverBudget"] = map["emailOnOverBudget"]?.toString()?.toInt() ?: 0
+        }
+
+
+        map["direction"]?.toString()?.let {
+            map["direction"] = if (table == "calltransaction") {
+                it.split("").mapNotNull { m ->
+                    when (m.trim()) {
+                        "C" -> Direction.INCOMING.ordinal //"C"
+                        "G" -> Direction.OUTGOING.ordinal //"G"
+                        "I" -> Direction.INTERNAL.ordinal //"I"
+                        else -> null
+                    }
+                }.joinToString("")
+            } else {
+                it.split("").mapNotNull { m ->
+                    when (m.trim()) {
+                        "C" -> "C"
+                        "G" -> "G"
+                        "I" -> "I"
+                        else -> null
+                    }
+                }.joinToString("")
+            }
+        }
+
+        map.filterNot { it.key == "id" || it.key == "structure" }.forEach { (t, u) ->
+            when {
+                t.contains("__") -> {
+                    mapFinalize[t] = u
+                }
+
+                t.contains("_") -> {
+                    val isParent = t.startsWith("parent")
+                    val value = if (t == "pbx_id") {
+                        findUidByCode("pbx", u ?: "PBX01")
+                    } else u?.toString()?.let { code ->
+                        val clazzEntity = if (isParent) table else t.split("_")[0]
+                        val field = if (t.split("_")[0].lowercase().startsWith("corcos")) "command" else "code"
+                        findUidByCode(clazzEntity, code, field)
+                    }
+
+                    if (value != null) {
+                        mapFinalize[t] = value
+                    }
+                }
+
+                else -> {
+                    mapFinalize[t] = u
+                }
+            }
+        }
+
+        mapFinalize["pin"]?.toString()?.let {
+            mapFinalize["pin"] = if (it.isBlank()) null
+            else cpDecrypt.decrypt(it)?.let { p ->
+                if (table == "phone_user") ToolAes.encrypt(p) else p
+            }
+        }
+
+        if (table == "calltransaction") {
+            val to = mapFinalize["extTransferTo"]?.toString() ?: ""
+            val from = mapFinalize["extTransferFrom"]?.toString() ?: ""
+            mapFinalize["transferType"] = when {
+                to.isBlank() && from.isBlank() -> TransferType.DIRECT.ordinal
+                to.isNotBlank() && from.isBlank() -> TransferType.TRANSFER_TO.ordinal
+                to.isBlank() && from.isNotBlank() -> TransferType.TRANSFER_FROM.ordinal
+                else -> TransferType.TRANSFER_FROM_AND_TRANSFER_TO.ordinal
+            }
+        }
+
+        if (table == "web_user") {
+            defaultProfile?.let { mapFinalize["profile_id"] = it }
+            mapFinalize["canRequest"] = 0
+            mapFinalize["numberRightsApproval"] = 0
+            mapFinalize["requestForOthers"] = 0
+            val pinPassword = map["loginPin"]?.toString()?.toInt()?.let { i ->
+                if (i == 1) {
+                    map["phoneUser_id"]?.toString()?.let { uid ->
+                        findPinPhonePbx(uid)?.let { phoneUserPbx ->
+                            mapFinalize["pinext_uid"] = phoneUserPbx.first
+                            ToolAes.decrypt(phoneUserPbx.second)
+                        }
+                    }
+                } else null
+            }
+            mapFinalize["password"] = when {
+                pinPassword != null -> {
+                    tempPassword[pinPassword] ?: run {
+                        val pass = passwordEncoder.encode(pinPassword)
+                        tempPassword[pinPassword] = pass
+                        pass
+                    }
+                }
+
+                else -> if (mapFinalize["password"].toString().isEmpty()) {
+                    tempPassword["12345"] ?: run {
+                        val pass = passwordEncoder.encode("12345")
+                        tempPassword["12345"] = pass
+                        pass
+                    }
+                } else mapFinalize["password"]
+            }
+        }
+
+        if (table == "corcos") {
+            mapFinalize["name"] = "Corcos ${map["name"]}"
+        }
+
+        if (table == "trunk") {
+            mapFinalize["abonemen"] = 0
+        }
+
+        if (table == "organization") {
+            mapFinalize["memberLimit"] = 0
+        }
+
+        if (table == "phone_user") {
+            mapFinalize["asApprover"] = 0
+            mapFinalize["bypassApproval"] = 0
+            mapFinalize["limitStatus"] = 0
+        }
+
+        if (table == "account") {
+            mapFinalize["number"] = "9"
+        }
+
+        return mapFinalize.filterNot { it.key == "loginPin" }
+            .mapValues { v -> v.value?.toString() } as MutableMap<String, Any?>
+    }
+
+    private fun findUidByCode(table: String, value: Any, field: String = "code", select: String = "uid"): String? {
+        return if (value.toString().isBlank()) null else {
+            val tableName = General.toTableName(table)
+            temporaryDataByCode["$tableName;$select;$value"] ?: run {
+                try {
+                    val fd = if (table == "trunk") " AND subscribed_no != 'new trunk' " else " "
+                    apiService.transaction { em ->
+                        val result = em.createNativeQuery("SELECT $select FROM $tableName WHERE $field = :$field $fd")
+                            .setParameter(field, value.toString())
+                            .resultList
+                            .first() as String?
+                        temporaryDataByCode["$tableName;$select;$value"] = result
+                        result
+                    }
+                } catch (e: Exception) {
+                    logger.info("failed find uid $tableName $field $value")
+                    null
+                }
+            }
+        }
+    }
+
+    private fun findCodeByUid(table: String, value: Any, field: String = "uid", select: String = "code"): String? {
+        return if (value.toString().isBlank()) null else {
+            val tableName = General.toTableName(table)
+            temporaryDataByCode["$tableName;$select;$value"] ?: run {
+                try {
+                    apiService.transaction { em ->
+                        val result = em.createNativeQuery("SELECT $select FROM $tableName WHERE $field = :$field")
+                            .setParameter(field, value.toString())
+                            .resultList
+                            .first() as String?
+                        temporaryDataByCode["$tableName;$select;$value"] = result
+                        result
+                    }
+                } catch (e: Exception) {
+                    logger.info("failed find code $tableName $field $value")
+                    null
+                }
+            }
+        }
+    }
+
+    private val defaultProfile: Any? by lazy {
+        apiService.findListPage(
+            Rights::class.java,
+            listOf("uid"),
+            FilterData.filter("name", FilterData.FILTEROP.EQ, "Default")
+        ).firstOrNull()?.get("uid")
+    }
+
+    private fun findPinPhonePbx(phoneUserCode: String): Pair<String, String>? {
+        val key = "${PhoneUserPbx::class.java.simpleName};$phoneUserCode"
+        val tmpData = temporaryDataEntity[key] ?: run {
+            apiService.findListAll(
+                PhoneUserPbx::class.java,
+                listOf("pin", "pbx.uid", "phoneUser.code")
+            ).forEach { dt ->
+                temporaryDataEntity["${PhoneUserPbx::class.java.simpleName};${dt["phoneUser.code"]!!.toString()}"] = dt
+            }
+            //.associateBy { it["phoneUser.code"]!!.toString() }
+            temporaryDataEntity[key]
+        }
+
+        return tmpData?.let { it["pbx.uid"].toString() to it["pin"].toString() }
+    }
+
+    fun buildToInsertData(table: String, map: MutableMap<String, Any?>, uniqueTable: String?): Int {
+        return try {
+            val finalizer = finalizeMap(table, map)
+            val phoneUserPin = finalizer["pin"]
+            val phoneUserExtension = finalizer["extension"]
+            val corcosNormal = finalizer["corcosNormal_id"]
+            val corcosReducing = finalizer["corcosReducing_id"]
+            val corcosBlock = finalizer["corcosBlock_id"]
+
+            // budget
+            val budgetAnnual = (finalizer["budget.maxCost"]?.toString()
+                ?: finalizer["budget__maxCost"]?.toString())?.toDoubleOrNull()
+            val warningAnnual = (finalizer["budget.warnCost"]?.toString()
+                ?: finalizer["budget__warnCost"]?.toString())?.toDoubleOrNull()
+            val budgetMaxCost = budgetAnnual?.let { max ->
+                List(12) { max }.joinToString(";")
+            }
+            val budgetWarnCost = warningAnnual?.let { warn ->
+                val monthBudget = budgetAnnual ?: 0.0
+                val warnPercent = (warn / monthBudget) * 100
+                List(12) { warnPercent.toInt() }.joinToString(";")
+            }
+
+            // phoneUserPbx
+            var phoneUserPbxIds: MutableList<String> = mutableListOf()
+            val finalMapEntity = when (table) {
+                "calltransaction" -> {
+                    val callMap = prepareDataCallTransaction(finalizer)
+                    callMap["additionalData1"] = "migration"
+                    callMap
+                }
+
+                "phone_user" -> {
+                    val pbxId = finalizer["pbx.id"]?.toString() ?: finalizer["pbx.list"]?.toString() ?: ""
+                    val pbxGroup = finalizer["pbx__list"]?.toString()?.split(";")
+                        ?.mapNotNull { findUidByCode("pbx", it) } ?: listOf()
+                    if (pbxId.isBlank() && pbxGroup.isEmpty()) {
+                        finalizer["pbx__default"]?.toString()?.let { pbx ->
+                            if (pbx.isNotBlank()) {
+                                phoneUserPbxIds.add(pbx)
+                            }
+                        }
+                    } else {
+                        if (pbxId.isNotBlank()) {
+                            phoneUserPbxIds.addAll(pbxId.split(";"))
+                        }
+                        if (pbxGroup.isNotEmpty()) {
+                            phoneUserPbxIds.addAll(pbxGroup)
+                        }
+                    }
+
+                    val finalMap = finalizer.filterNot { fi ->
+                        listOf(
+                            "pbx.list",
+                            "pbx__list",
+                            "pbx__default",
+                            "pbx_id",
+                            "extension",
+                            "pin",
+                            "budget.maxCost",
+                            "budget__maxCost",
+                            "budget.warnCost",
+                            "budget__warnCost",
+                            "maxCost",
+                            "warnCost",
+                            "corcosNormal_id",
+                            "corcosReducing_id",
+                            "corcosBlock_id"
+                        ).any { a -> a == fi.key }
+                    } as MutableMap<String, Any?>
+                    finalMap["corcos"] = if (phoneUserPbxIds.isEmpty()) "" else "[${
+                        phoneUserPbxIds.distinct().joinToString(",") { m ->
+                            "{\"pbx\":\"$m\"," +
+                                    "\"normal\":\"${corcosNormal ?: ""}\"," +
+                                    "\"reducing\":\"${corcosReducing ?: ""}\"," +
+                                    "\"block\":\"${corcosBlock ?: ""}\"}"
+                        }
+                    }]"
+                    finalMap
+                }
+
+                else -> finalizer
+            }
+
+            insertDataService.insertDataWithNativeQuery(
+                table,
+                uniqueTable,
+                finalMapEntity,
+                functionAfter = { uid, isUpdate ->
+                    // create budget
+                    if (BudgetUserType.entries.any { a -> a.name == table.uppercase() }) {
+                        insertDataService.insertDataWithNativeQuery(
+                            "budget", uniqueTable = null, mutableMapOf(
+                                "userType" to BudgetUserType.valueOf(table.uppercase()).ordinal,
+                                "userUid" to uid,
+                                "type" to BudgetType.FLAT.ordinal,
+                                "annualCost" to (budgetAnnual?.let { it * 12 }?.toInt() ?: 0),
+                                "accumulate" to 0,
+                                "maxCost" to (budgetMaxCost ?: "0;0;0;0;0;0;0;0;0;0;0;0"),
+                                "warnCostPercentage" to (budgetWarnCost ?: "0;0;0;0;0;0;0;0;0;0;0;0"),
+                                "tempCost" to "0;0;0;0;0;0;0;0;0;0;0;0",
+                                "maxAutoCalculate" to "1;1;1;1;1;1;1;1;1;1;1;1"
+                            )
+                        )
+                    }
+
+                    // create phoneUserPbx
+                    if (phoneUserPbxIds.isNotEmpty()) {
+                        phoneUserPbxIds.distinct().forEach { pbxId ->
+                            insertDataService.insertDataWithNativeQuery(
+                                "phoneuserpbx", uniqueTable = null, mutableMapOf(
+                                    "pin" to phoneUserPin,
+                                    "extension" to phoneUserExtension,
+                                    "pbx_id" to pbxId,
+                                    "phoneUser_id" to uid,
+                                    "corcos_normal_uid" to corcosNormal,
+                                    "corcos_reducing_uid" to corcosReducing,
+                                    "corcos_block_uid" to corcosBlock
+                                )
+                            )
+                        }
+                    }
+                })
+
+            1
+        } catch (e: Exception) {
+            logger.error("failed insert data $table", e)
+            0
+        }
+    }
+
+    private fun buildRawData(data: MutableMap<String, Any?>): List<Any?> {
+        val seg1 = "CDR"
+        val seg2 = data["pbx_id"]?.let { findCodeByUid("pbx", it) ?: "" } ?: ""
+        val seg3 = data["direction"]?.let { Direction.entries[it.toString().toInt()].code } ?: ""
+
+        val startOfCall = data["startOfCall"]?.toString()?.let {
+            LocalDateTime.parse(it, DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"))
+        }
+        val endOfCall = startOfCall?.plusSeconds(data["duration"]?.toString()?.toLong() ?: 0)
+
+        val seg4 = startOfCall?.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) ?: ""
+        val seg5 = startOfCall?.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) ?: ""
+        val seg6 = endOfCall?.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) ?: ""
+        val seg7 = ""
+        val seg8 = data["duration"]
+        val seg9 = data["accessNumber"] ?: ""
+        val seg10 = ""
+        val seg11 = data["trunk_id"]?.let { findCodeByUid("trunk", it) ?: "" }
+        val seg12 = ""
+        val seg13 = data["number"] ?: ""
+        val seg14 = data["callerNumber"] ?: ""
+        val seg15 = data["pin"] ?: ""
+        val seg16 = ""
+        val seg17 = data["extension"] ?: ""
+        val seg18 = data["extTransferFrom"] ?: ""
+        val seg19 = data["extTransferTo"] ?: ""
+        val seg20 = data["transferType"]?.let {
+            when (it as TransferType) {
+                TransferType.DIRECT -> "D"
+                TransferType.TRANSFER_TO -> "TT"
+                TransferType.TRANSFER_FROM -> "TF"
+                else -> "F"
+            }
+        } ?: ""
+        val seg21 = "" //data["conferenceType"]?.let { (it as ConferenceType).code } ?: ""
+        val seg22 = "" //data["communicationType"]?.let { (it as CommunicationType).code } ?: ""
+        val seg23 = data["redirectReason"] ?: ""
+        val seg24 = data["terminationCode"] ?: ""
+        val seg25 = data["additionalData1"] ?: ""
+        val seg26 = data["additionalData2"] ?: ""
+
+        return listOf(
+            seg1, seg2, seg3, seg4, seg5, seg6, seg7, seg8, seg9, seg10,
+            seg11, seg12, seg13, seg14, seg15, seg16, seg17, seg18, seg19, seg20,
+            seg21, seg22, seg23, seg24, seg25, seg26
+        )
+
+    }
+
+    private fun prepareDataCallTransaction(data: MutableMap<String, Any?>): MutableMap<String, Any?> {
+        data["transferType"] = when {
+            (data["extTransferFrom"]?.toString()?.isBlank() ?: false)
+                    && (data["extTransferTo"]?.toString()?.isBlank() ?: false) -> TransferType.DIRECT
+            (data["extTransferFrom"]?.toString()?.isBlank() ?: false)
+                    && (data["extTransferTo"]?.toString()?.isNotBlank() ?: false) -> TransferType.TRANSFER_TO
+            (data["extTransferFrom"]?.toString()?.isNotBlank() ?: false)
+                    && (data["extTransferTo"]?.toString()?.isBlank() ?: false) -> TransferType.TRANSFER_FROM
+            else -> null
+        }
+        val buildRawData = buildRawData(data)
+        val rawData = buildRawData.joinToString(",")
+
+        data["rawData"] = rawData
+        getCallTo(buildRawData[12].toString())?.forEach { (t, u) -> data[t] = u }
+
+        data["area_uid"]?.toString()?.let { area ->
+            val domainFrom = getCallFrom(area, buildRawData[10]?.toString(), buildRawData[1]?.toString())
+            data["zone_uid"]?.toString()?.let {
+                val zoneDomain = if (data["domain"] == domainFrom) "LOC" else "NDD"
+                data["zone_uid"] = findUidByCode("zone", zoneDomain)
+            }
+        }
+        data["transferType"]?.let {
+            data["transferType"] = (it as TransferType).ordinal
+        }
+        data.remove("domain")
+        return data
+
+    }
+
+    private fun getCallTo(number: String): Map<String, Any?>? {
+        return try {
+            apiService.transaction { em ->
+                val hlr = em.createNativeQuery(
+                    "SELECT TOP 1 prefix, provider_uid, area_uid, phone_type, zone_uid, domain \n" +
+                            "FROM hlr\n" +
+//                            "WHERE '$number' LIKE prefix + '%'\n" +
+                            "WHERE '$number' LIKE prefix + '%'\n" +
+                            "ORDER BY LEN(prefix) DESC"
+                ).singleResult as Array<Any?>
+                mapOf(
+                    "prefix" to hlr[0].toString(),
+                    "provider_to_uid" to hlr[1].toString(),
+                    "area_uid" to hlr[2].toString(),
+                    "phone_type" to hlr[3].toString(),
+                    "zone_uid" to hlr[4].toString(),
+                    "domain" to hlr[5].toString()
+                )
+            }
+        } catch (e: Exception) {
+            logger.info("failed get call to attribute number $number")
+//            logger.error("failed get call to attribute number $number", e)
+            null
+        }
+    }
+
+    private fun getCallFrom(areaUid: String, trunkCode: String?, pbxCode: String?): String? {
+        return try {
+            val trunk = trunkCode?.let { findUidByCode("trunk", it, select = "provider_uid") }
+            val pbx = pbxCode?.let { findUidByCode("pbx", it, select = "provider_uid") }
+
+            apiService.transaction { em ->
+                val hlr = em.createNativeQuery(
+                    "SELECT TOP 1 domain \n" +
+                            "FROM hlr\n" +
+//                            "WHERE '$number' LIKE prefix + '%'\n" +
+                            "WHERE area_uid = '$areaUid' AND provider_uid = '${trunk ?: pbx ?: "#uid"}' \n"
+                ).singleResult as Array<Any?>
+                hlr[0].toString()
+            }
+        } catch (e: Exception) {
+            logger.info("failed get call from attribute trunk $trunkCode pbx $pbxCode")
+//            logger.error("failed get call to attribute number $number", e)
+            null
+        }
+    }
+
+
+}
+
+@Service
+@Transactional
+class InsertDataService(
+    val apiService: ApiService,
+    val migrationSettingService: MigrationSettingService
+) {
+
+    private fun findParentStructure(table: String, uid: String): String? {
+        val parentStructure = temporaryDataByCode["$table;structure;$uid"] ?: run {
+            try {
+                apiService.transaction { em ->
+                    val result = em.createNativeQuery("SELECT structure FROM $table WHERE uid = :uid")
+                        .setParameter("uid", uid)
+                        .resultList
+                        .first() as String?
+                    temporaryDataByCode["$table;structure;$uid"] = result ?: uid
+                    result ?: uid
+                }
+            } catch (e: Exception) {
+                SimpleLogger.getLogger(this::class.java).error("failed get parent structure $table $uid", e)
+                uid
+            }
+        }
+        return parentStructure
+    }
+
+    fun insertDataWithNativeQuery(
+        table: String,
+        uniqueTable: String?,
+        mapData: MutableMap<String, Any?>,
+        functionAfter: ((uid: String, isUpdate: Boolean) -> Unit)? = null
+    ): String? {
+        val fieldUnique = when (table) {
+            "phoneuserpbx" -> "phone_user_uid;pbx_uid"
+            "budget" -> "user_uid"
+            else -> uniqueTable
+        }
+
+        val fields = if (table == "calltransaction") mutableListOf() else mutableListOf("uid")
+        val finalMap = mapData
+            .filterNot { it.key == "cpid" }
+            .mapValues { it.value?.toString()?.ifEmpty { null } }
+            .toMutableMap()
+
+        finalMap.keys.forEach {
+            val t = it.replace("_id", "_uid")
+            fields.add(t)
+        }
+
+        val fieldKey = fieldUnique?.split(";")?.mapNotNull { m -> mapData[m]?.toString() }?.joinToString(";")
+        val uidFromDb = temporaryDataEntity["$table;$fieldKey"]?.get("uid")?.toString()
+        val uid = uidFromDb ?: ULID.random()
+        val isUpdate = uidFromDb != null
+
+        if (isUpdate) {
+            finalMap.remove("uid")
+        }
+
+//        val structure = finalMap["parent_id"]?.toString()?.let {
+//            fields.add("structure")
+//            val parentUid = EntityUtility(
+//                apiService, General.clazzEntity(className.camelCase())!!
+//            ).parentStructure(it)
+//            "$parentUid|$uid"
+//        }
+        val structure = finalMap["parent_id"]?.let {
+            findParentStructure(table, it)?.let { parentUid ->
+                fields.add("structure")
+                "$parentUid|$uid"
+            }
+        }
+
+
+        val query = fieldUnique?.let { fu ->
+            val uniqueOn = fu.split(";").joinToString(" AND ") { m -> "t.$m = s.$m" }
+            """
+    MERGE INTO $table WITH (HOLDLOCK) AS t
+    USING (VALUES (${fields.joinToString() { ":$it" }})) 
+        AS s(${fields.joinToString() { it.camelToSnake() }})
+    ON $uniqueOn
+    WHEN MATCHED THEN
+        UPDATE SET 
+            ${fields.joinToString(",\n") { "t.${it.camelToSnake()} = s.${it.camelToSnake()}" }}
+    WHEN NOT MATCHED THEN
+        INSERT (${fields.joinToString { it.camelToSnake() }})
+        VALUES (${fields.joinToString { "s.${it.camelToSnake()}" }});
+""".trimIndent()
+        } ?: ("INSERT INTO $table (${fields.joinToString() { it.camelToSnake() }}) " +
+                "VALUES (${fields.joinToString() { ":$it" }})")
+
+        val sqlNative = apiService.em.createNativeQuery(query)
+
+        if (table != "calltransaction" && !isUpdate) {
+            sqlNative.setParameter("uid", uid)
+        }
+
+        structure?.let { sqlNative.setParameter("structure", structure) }
+        finalMap.forEach { (t, u) -> sqlNative.setParameter(t.replace("_id", "_uid"), u) }
+        sqlNative.executeUpdate()
+
+//        functionAfter?.invoke(uid, isUpdate)
+
+        return uid
+    }
+
+
+    fun updateRawData() {
+        try {
+            apiService.transaction { em ->
+                em.createNativeQuery("UPDATE ct\n" +
+                        "SET raw_data = CONCAT_WS(',',\n" +
+                        "\t'CDR',\n" +
+                        "    ISNULL(pbx.code, ''),\n" +
+                        "    CASE \n" +
+                        "        WHEN ct.direction = 0 THEN 'G'\n" +
+                        "        WHEN ct.direction = 1 THEN 'C'\n" +
+                        "        ELSE 'I'\n" +
+                        "    END,\n" +
+                        "    CONVERT(VARCHAR, ct.start_of_call, 120),\n" +
+                        "    CONVERT(VARCHAR, ct.start_of_call, 120),\n" +
+                        "    CONVERT(VARCHAR, DATEADD(SECOND, ct.duration, ct.start_of_call), 120),\n" +
+                        "    '',\n" +
+                        "    ct.duration,\n" +
+                        "    ISNULL(ct.access_number, ''),\n" +
+                        "    '',\n" +
+                        "    ISNULL(trunk.code, ''),\n" +
+                        "    '',\n" +
+                        "    ISNULL(ct.number,''),\n" +
+                        "    ISNULL(ct.caller_number,''),\n" +
+                        "    ISNULL(ct.pin,''),\n" +
+                        "    '',\n" +
+                        "    ISNULL(ct.extension,''),\n" +
+                        "    ISNULL(ct.ext_transfer_from,''),\n" +
+                        "    ISNULL(ct.ext_transfer_to,''),\n" +
+                        "    ISNULL(CASE WHEN ct.transfer_type = 0 THEN 'D'\n" +
+                        "\tWHEN ct.transfer_type = 1 THEN 'TF'\n" +
+                        "\tWHEN ct.transfer_type = 2 THEN 'TT'\n" +
+                        "\tELSE 'F' END,''),\n" +
+                        "    '',\n" +
+                        "    '',\n" +
+                        "    ISNULL(ct.redirect_reason,''),\n" +
+                        "    ISNULL(ct.termination_code,''),\n" +
+                        "    CASE \n" +
+                        "        WHEN ct.additional_data1 = 'migration' THEN ''\n" +
+                        "        ELSE ISNULL(ct.additional_data1,'')\n" +
+                        "    END,\n" +
+                        "    ISNULL(ct.additional_data2,'')\n" +
+                        ")\n" +
+                        "FROM calltransaction ct\n" +
+                        "LEFT JOIN pbx pbx ON pbx.uid = ct.pbx_uid\n" +
+                        "LEFT JOIN trunk trunk ON trunk.uid = ct.trunk_uid\n" +
+                        "WHERE ct.raw_data IS NULL OR ct.raw_data = ''")
+            }.executeUpdate()
+        } catch (e: Exception) {
+            SimpleLogger.getLogger(this::class.java).error("failed update raw data blank data", e)
+        }
+    }
+
+}

+ 16 - 0
src/main/kotlin/com/datacomsolusindo/migration/model/MigrationFileData.kt

@@ -0,0 +1,16 @@
+package com.datacomsolusindo.migration.model
+
+import java.nio.file.Path
+
+class MigrationFileData(
+    val table: String,
+    val unique: String?,
+    val fields: Map<String, String>,
+    val migrationFile: Path,
+    val historyFile: Path? = null,
+    val groupFile: Path? = null
+) {
+    var rootData: List<Map<String, Any?>>? = null
+    var historyData: List<Map<String, Any?>>? = null
+    var groupData: List<Map<String, Any?>>? = null
+}

+ 253 - 0
src/main/kotlin/com/datacomsolusindo/migration/service/MigrationFileService.kt

@@ -0,0 +1,253 @@
+package com.datacomsolusindo.migration.service
+
+import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
+import com.datacomsolusindo.cpx_shared_code.utility.Util
+import com.datacomsolusindo.migration.General
+import com.datacomsolusindo.migration.MigrationSettingService
+import com.datacomsolusindo.migration.camelToSnake
+import com.datacomsolusindo.migration.data.FinalizedDataService
+import com.datacomsolusindo.migration.model.MigrationFileData
+import com.datacomsolusindo.migration.queueInsertMigrationData
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.runBlocking
+import org.springframework.stereotype.Service
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Path
+import java.nio.file.Paths
+import java.time.LocalDateTime
+import java.time.format.DateTimeFormatter
+import kotlin.time.measureTimedValue
+
+@Service
+class MigrationFileService(
+    private val migrationSettingService: MigrationSettingService,
+    private val finalizedDataService: FinalizedDataService
+) {
+
+    private val logger = SimpleLogger.getLogger(this::class.java)
+
+    private val TEMP = "temp_"
+    private val MIGRATION = "_migration_"
+    private val MIGRATION_CONFIG = "config/migration.yml"
+
+    private fun isMigrationFile(path: Path): Boolean = path.fileName.toString().contains(MIGRATION)
+
+    private fun isTempFile(path: Path): Boolean = path.fileName.toString().startsWith(TEMP)
+
+    private fun extractOrder(fileName: String): Int {
+        val cleanName = fileName.removePrefix(TEMP)
+        return cleanName.substringBefore("_").toIntOrNull() ?: Int.MAX_VALUE
+    }
+
+    private fun toHistoryName(input: String, key: String): String {
+        val nameWithoutExt = input.substringBeforeLast(".")
+        val ext = input.substringAfterLast(".", "")
+        val base = nameWithoutExt.substringAfter("_").removeSuffix("_$key")
+        return if (ext.isNotEmpty()) "${base}_$key.$ext" else "${base}_$key"
+    }
+
+    fun renameToTemp(path: Path) {
+        val time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))
+        val newName = "temp_${time}_${path.fileName}"
+        val target = path.parent.resolve(newName)
+//        Files.move(path, target, StandardCopyOption.ATOMIC_MOVE)
+    }
+
+    fun execute() {
+        val migrationFile = prepareFile()
+        runBlocking {
+            migrationFile.map {
+                renameToTemp(it.migrationFile)
+                val (migrationData, historyData, groupData) =
+                    prepareMigrationData(
+                        it.migrationFile.toFile(),
+                        it.historyFile?.toFile(),
+                        it.groupFile?.toFile()
+                    )
+
+                it.rootData = migrationData
+                it.historyData = historyData
+                it.groupData = groupData
+                it
+            }.forEach { fi ->
+                val finalizedData = finalizedDataService.processData(fi)
+                queueInsertMigrationData.add(fi to finalizedData)
+            }
+        }
+
+    }
+
+    private fun prepareFile(): MutableList<MigrationFileData> {
+        logger.info("migration file service started read data")
+        val migrationFileData: MutableList<MigrationFileData> = mutableListOf()
+        Files.list(Paths.get("migration")).use { stream ->
+            val tempFiles = stream
+                .filter { Files.isRegularFile(it) && isMigrationFile(it) && !isTempFile(it) }
+                .sorted(compareBy { extractOrder(it.fileName.toString()) })
+                .toList()
+
+            val additionalFiles = tempFiles.filter {
+                it.fileName.toString().contains("_history") || it.fileName.toString().contains("_group")
+            }
+
+            tempFiles.filterNot {
+                it.fileName.toString().contains("_history") || it.fileName.toString().contains("_group")
+            }.forEach {
+                val filename = it.fileName.toString()
+                val migrationTarget = filename.split("_")[2].replace(".txt", "")
+                val schema = migrationSettingService.schema.firstOrNull { fi -> fi.target == migrationTarget }
+                val fields = schema?.attribute?.let { it as MutableMap<String, String> } ?: mutableMapOf()
+                val uniqueTable = schema?.unique
+                migrationFileData.add(
+                    MigrationFileData(
+                        table = General.toTableName(migrationTarget),
+                        unique = uniqueTable,
+                        fields = fields,
+                        migrationFile = it,
+                        historyFile = additionalFiles.firstOrNull { f ->
+                            f.fileName.toString().endsWith(toHistoryName(filename, "history"))
+                        },
+                        groupFile = additionalFiles.firstOrNull { f ->
+                            f.fileName.toString().endsWith(toHistoryName(filename, "history"))
+                        }
+                    )
+                )
+            }
+        }
+
+
+        logger.info("ready to migration file [${migrationFileData.size}] -> ${migrationFileData.joinToString { it.table }}")
+        return migrationFileData
+    }
+
+    suspend fun prepareMigrationData(
+        migrationFile: File,
+        historyFile: File?,
+        groupFile: File?
+    ) = coroutineScope {
+        val migrationDeferred = async(Dispatchers.IO) { readQueryDataToMap(migrationFile) }
+        val historyDeferred = async(Dispatchers.IO) { historyFile?.let { readQueryDataToMap(it) } }
+        val groupDeferred = async(Dispatchers.IO) { groupFile?.let { readQueryDataToMap(it) } }
+
+        val migrationData = migrationDeferred.await()
+        val historyData = historyDeferred.await()
+        val groupData = groupDeferred.await()
+
+        Triple(migrationData, historyData, groupData)
+    }
+
+    private fun readQueryDataToMap(file: File): List<Map<String, Any?>> {
+        val result = measureTimedValue {
+            file.bufferedReader(StandardCharsets.UTF_8).useLines { lines ->
+                lines
+                    .filter { it.startsWith("INSERT") }
+                    .mapNotNull {
+                        try {
+                            insertSqlToMap(it)
+                        } catch (e: Exception) {
+                            logger.error("failed read data query", e)
+                            null
+                        }
+                    }
+                    .toList()
+            }
+        }
+        logger.info(
+            "migration read query data ${result.value.size} takes time " +
+                    "${result.duration.inWholeMilliseconds}ms"
+        )
+
+        return if (file.name.contains("trunk_history")) {
+            val validTrunk = result.value.filter { f -> f.get("trunk_number")?.toString()?.let { t -> t != "new trunk" } ?: true }
+            validTrunk.forEach { println(Util.mapper.writeValueAsString(it)) }
+            validTrunk
+        } else result.value
+
+    }
+
+    private fun insertSqlToMap(sql: String): Map<String, Any?> {
+        val splitSql = sql.split(") VALUES (")
+        val fieldColumn = splitSql[0].split("(").last().split(",")
+            .map { it.trim().removeSurrounding("[", "]") }
+        val valueColumn = splitValues(splitSql[1].dropLast(1))
+            .map { parseValue(it.trim()) }
+        val map: MutableMap<String, Any?> = mutableMapOf()
+        fieldColumn.mapIndexed { index, key ->
+            map[key] = valueColumn[index]
+        }
+        return map
+    }
+
+    private fun splitValues(input: String): List<String> {
+        val result = mutableListOf<String>()
+        var depth = 0
+        var inQuotes = false
+        var current = StringBuilder()
+
+        var i = 0
+        while (i < input.length) {
+            val c = input[i]
+            when (c) {
+                '\'' -> {
+                    // toggle flag ketika ada tanda kutip tunggal
+                    inQuotes = !inQuotes
+                    current.append(c)
+                }
+
+                '(' -> {
+                    if (!inQuotes) depth++
+                    current.append(c)
+                }
+
+                ')' -> {
+                    if (!inQuotes) depth--
+                    current.append(c)
+                }
+
+                ',' -> {
+                    // hanya split jika BUKAN sedang di dalam quotes dan depth = 0
+                    if (!inQuotes && depth == 0) {
+                        result.add(current.toString().trim())
+                        current = StringBuilder()
+                    } else {
+                        current.append(c)
+                    }
+                }
+
+                else -> current.append(c)
+            }
+            i++
+        }
+
+        result.add(current.toString().trim())
+        return result
+    }
+
+    private fun parseValue(raw: String): Any? {
+        val value = raw.trim()
+        if (value.equals("NULL", true)) return null
+        // CAST(x AS ...)
+        if (value.startsWith("CAST", true)) {
+            val inner = Regex("CAST\\((.*)\\s+AS", RegexOption.IGNORE_CASE)
+                .find(value)?.groupValues?.get(1)?.trim()
+            return parseValue(inner ?: value)
+        }
+        // N'string'
+        if (value.startsWith("N'") && value.endsWith("'")) {
+            return value.substring(2, value.length - 1)
+        }
+        // 'string'
+        if (value.startsWith("'") && value.endsWith("'")) {
+            return value.substring(1, value.length - 1)
+        }
+        // number
+        return value.toLongOrNull()
+            ?: value.toDoubleOrNull()
+            ?: value
+    }
+
+}

+ 92 - 0
src/main/kotlin/com/datacomsolusindo/migration/service/WorkerMigrationService.kt

@@ -0,0 +1,92 @@
+package com.datacomsolusindo.migration.service
+
+import com.datacomsolusindo.cpx_shared_code.utility.CacheData
+import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
+import com.datacomsolusindo.cpx_shared_code.utility.Util
+import com.datacomsolusindo.migration.MigrationEntity
+import com.datacomsolusindo.migration.data.FinalizedDataService
+import com.datacomsolusindo.migration.data.InsertDataService
+import com.datacomsolusindo.migration.queueInsertData
+import com.datacomsolusindo.migration.queueInsertMigrationData
+import org.springframework.stereotype.Service
+import java.util.concurrent.Executors
+import kotlin.time.measureTimedValue
+
+
+val executorPrepareData = Executors.newSingleThreadExecutor()
+val executorInsertData = Executors.newSingleThreadExecutor()
+
+@Service
+class WorkerMigrationService(
+    private val migrationEntity: MigrationEntity,
+    private val finalizedDataService: FinalizedDataService,
+    private val insertDataService: InsertDataService
+) {
+
+    private val logger = SimpleLogger.getLogger(this::class.java)
+
+    fun startInsertWorker() {
+        executorInsertData.submit {
+            while (true) {
+                val data = queueInsertData.take()
+                try {
+                    val process = measureTimedValue { migrationEntity.insertData(data.first, data.second) }
+                    logger.info(
+                        "add data migration ${data.first.simpleName} " +
+                                "data ${data.second.size} " +
+                                "success ${data.second.size - process.value.size} " +
+                                "failed ${process.value.size} " +
+                                "takes time ${process.duration.inWholeMilliseconds}ms"
+                    )
+                    CacheData.update(data.first.simpleName, false)
+                } catch (ex: Exception) {
+                    logger.error("Insert failed", ex)
+                }
+            }
+        }
+    }
+
+    fun queueProcessingInsertDataMigration() {
+        executorInsertData.submit {
+            while (true) {
+                val migrationData = queueInsertMigrationData.take()
+                val attribute = migrationData.first
+                if (attribute.table == "calltransaction") {
+                    insertDataService.updateRawData()
+                }
+                val data = migrationData.second//.take(100)
+                try {
+                    val chunkData = data.chunked(500)
+                    logger.info("migration table ${attribute.table} with total data ${data.size} and chunk data ${chunkData.size}")
+                    chunkData.forEach {
+                        var success = 0
+                        val process = measureTimedValue {
+                            it.let { dt ->
+                                if (dt.first().keys.any { a -> a == "structure" }) {
+                                    dt.sortedBy { it["structure"].toString().length }
+                                } else dt
+                            }.forEach { ch ->
+                                val isSuccess = finalizedDataService.buildToInsertData(
+                                    attribute.table,
+                                    ch,
+                                    attribute.unique
+                                )
+                                success += isSuccess
+                            }
+                        }
+                        logger.info(
+                            "add data migration ${attribute.table} " +
+                                    "data ${it.size} " +
+                                    "success $success " +
+                                    "failed ${it.size - success} " +
+                                    "takes time ${process.duration.inWholeMilliseconds}ms"
+                        )
+                    }
+                } catch (ex: Exception) {
+                    logger.error("Insert failed", ex)
+                }
+            }
+        }
+    }
+
+}