فهرست منبع

update migration service split queue

masarif 6 روز پیش
والد
کامیت
e66ee875d3

+ 1 - 1
config/general-setting.yml

@@ -182,7 +182,7 @@ redirectUrl:
 - http://localhost:4200/oauth2redirect
 - https://cmpd.telmessenger.com/oauth2redirect
 - https://app.insomnia.rest/oauth/redirect
-dataKey: jRA4iyjy1ZCaIdlJBe6Cm%2BRPGb%2BwcO85%2FWR7Z9%2B8AFVDTPuvnJwmCw6pPbMrQP92CNB%2BMwAPuisTRE8%2FccPGKB%2FuKWIvPxBslzLvfX58IuH3nZB8bPrFiSZH2n5Q7RgjOxObPvHvOIpk4r0P6%2B%2BcJw%3D%3D
+dataKey: RMyvjTZXDMBnCr3jymw9oHGUUzIJTHCULv9x%2B6xE4De56nu2OlXjkTWuk5%2B3QLfyE5PJQUBl9SIAuZOGmpIJBTHvNBFOUN%2BtwT0FzDUIKfD%2BdOcU4lJzNzo9mc2cMilw
 
 #database: 
 #  type: sqlserver

+ 5 - 2
config/migration.yml

@@ -52,6 +52,7 @@ callTransaction:
   cost: cost_1
   service: cost_service
   tax: cost_tax1
+  discount: cost_adjustment
   currency: currency
   pbx_id: pbx_code
   terminationCode: call_termination
@@ -69,14 +70,16 @@ phoneUser:
   whatsapp: phoneuser_mobile
   position: position
   emailOnOverBudget: sendemail_overbudget
-  pin: history.phoneuser_pin
+  pin: history.phon euser_pin
   extension: history.default_extension
   organization_id: history.organization_code
   costCenter_id: history.costcenter_code
   pbx_id: history.pbx_code
-  pbxList: group.pbx_list
+  pbx.list: group.pbx_list
   appliedDate: history.app_datetime
   expiredDate: history.exp_datetime
+#  budget.maxCost: history.max_cost
+#  budget.warnCost: history.warn_cost
 corcos:
   command: command
   name: short_desc

+ 3 - 0
src/main/kotlin/com/datacomsolusindo/migration/MigrationApplication.kt

@@ -2,6 +2,7 @@ package com.datacomsolusindo.migration
 
 import com.datacomsolusindo.cpx_shared_code.utility.AppListener
 import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
+import kotlinx.coroutines.runBlocking
 import org.springframework.boot.autoconfigure.SpringBootApplication
 import org.springframework.boot.autoconfigure.domain.EntityScan
 import org.springframework.boot.builder.SpringApplicationBuilder
@@ -49,7 +50,9 @@ class AppEvent(val migrationService: MigrationService) {
     fun doSomethingAfterStartup() {
         logger.info("started service migration data with support mandiri menghidupi")
         val folder = Paths.get("migration")
+        migrationService.startInsertWorker()
         migrationService.scanAndQueue(folder)
+
 //        exitProcess(1)
     }
 

+ 163 - 71
src/main/kotlin/com/datacomsolusindo/migration/MigrationEntity.kt

@@ -19,13 +19,8 @@ import com.datacomsolusindo.cpx_shared_code.entity.Trunk
 import com.datacomsolusindo.cpx_shared_code.entity.WebUser
 import com.datacomsolusindo.cpx_shared_code.service.ApiService
 import com.datacomsolusindo.cpx_shared_code.service.CpDecrypt
-import com.datacomsolusindo.cpx_shared_code.utility.BudgetUserType
-import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
-import com.datacomsolusindo.cpx_shared_code.utility.ToolAes
-import com.datacomsolusindo.cpx_shared_code.utility.TransferType
-import com.datacomsolusindo.cpx_shared_code.utility.collectAllField
-import com.datacomsolusindo.cpx_shared_code.utility.findId
-import com.datacomsolusindo.cpx_shared_code.utility.value
+import com.datacomsolusindo.cpx_shared_code.utility.*
+import io.azam.ulidj.ULID
 import io.github.semutkecil.simplecriteria.FilterData
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.context.annotation.Bean
@@ -34,6 +29,7 @@ import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder
 import org.springframework.security.crypto.factory.PasswordEncoderFactories
 import org.springframework.security.crypto.password.PasswordEncoder
 import org.springframework.stereotype.Service
+import org.springframework.transaction.annotation.Transactional
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.util.UUID
@@ -42,7 +38,7 @@ import kotlin.time.measureTimedValue
 
 
 @Service
-class MigrationEntity(val passwordEncoder: PasswordEncoder) {
+class MigrationEntity(val passwordEncoder: PasswordEncoder, val queryNativeService: QueryNativeService) {
 
     @Autowired
     lateinit var apiService: ApiService
@@ -67,13 +63,33 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
             val rootData = readQueryDataToMap(rootFile)
             val mapRootData = rootData.map { map ->
                 val data: MutableMap<String, Any?> = mutableMapOf()
-                fields.forEach { (t, u) ->
-                    data[t] = when {
-                        u.startsWith("history.") -> getValueAnotherFile(fields, historyData, unique, data, t, u, "history")
-                        u.startsWith("group.") -> getValueAnotherFile(fields, groupData, unique, data, t, u, "group")
-                        else -> map[u]
+                val fieldRoots = fields.toList().filterNot { f -> f.second.contains(".") }
+                val joinRoots = fields.toList().filter { f -> f.second.contains(".") }
+
+                fieldRoots.forEach { f ->
+                    data[f.first] = map[f.second]
+                }
+                joinRoots.sortedByDescending { it.second }.forEach { f ->
+                    data[f.first] = when {
+                        f.second.startsWith("history.") ->
+                            getValueAnotherFile(fields, historyData, unique, data, f.first, f.second, "history")
+
+                        f.second.toString().startsWith("group.") ->
+                            getValueAnotherFile(fields, groupData, unique, data, f.first, f.second, "group")
+
+                        else -> map[f.second]
                     }
                 }
+
+//                fields.forEach { (t, u) ->
+//                    data[t] = when {
+//                        u.startsWith("history.") ->
+//                            getValueAnotherFile(fields, historyData, unique, data, t, u, "history")
+//
+//                        u.startsWith("group.") -> getValueAnotherFile(fields, groupData, unique, data, t, u, "group")
+//                        else -> map[u]
+//                    }
+//                }
                 data
             }
             mapRootData
@@ -95,6 +111,7 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
             if (it.any { a -> a.key == fields[unique] }) unique else "id"
         } ?: "id"
         val fieldUnique = fields[uniqueField]!!.split(".").last()
+
         val value = dataFile?.firstOrNull { f ->
             f[fieldUnique].toString() == rootData[uniqueField].toString()
         }?.get(valRoot.replace("${key}.", ""))
@@ -102,7 +119,7 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
         return if (keyRoot.contains("_")) value?.let { id ->
             dataFile.firstOrNull { f ->
                 f[fields["id"]!!.split(".").last()].toString() == id.toString()
-            }?.get(fields["code"])
+            }?.get(fields["code"]) ?: value
         } else value
     }
 
@@ -188,19 +205,18 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
         rootFile: File,
         historyFile: File?,
         groupFile: File?
-    ) {
-        failed.clear()
+    ): List<MutableMap<String, Any?>> {
         val data = dataToMap(clazz, fields, "code", rootFile, historyFile, groupFile)
-
-        val process = measureTime { insertData(clazz, data) }
-
-        logger.info(
-            "finished process migration ${clazz.simpleName} " +
-                    "data ${data.size} " +
-                    "success ${data.size - failed.size} " +
-                    "failed ${failed.size} " +
-                    "takes time ${process.inWholeMilliseconds}ms"
-        )
+        return data
+
+//        val process = measureTime { insertData(clazz, data) }
+//        logger.info(
+//            "finished process migration ${clazz.simpleName} " +
+//                    "data ${data.size} " +
+//                    "success ${data.size - failed.size} " +
+//                    "failed ${failed.size} " +
+//                    "takes time ${process.inWholeMilliseconds}ms"
+//        )
     }
 
     fun clazzEntity(migrationTarget: String): Class<out BaseEntity>? {
@@ -222,7 +238,8 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
         }
     }
 
-    private fun <T : BaseEntity> insertData(clazz: Class<T>, dataMap: List<MutableMap<String, Any?>>) {
+    fun <T : BaseEntity> insertData(clazz: Class<T>, dataMap: List<MutableMap<String, Any?>>): MutableList<Any> {
+        failed.clear()
         val sortingData = if (dataMap.any { it.any { a -> a.key == "structure" } }) {
             dataMap.sortedBy { f -> f["structure"].toString().length }
         } else dataMap
@@ -235,7 +252,7 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
                 // phoneUserPbx
                 val phoneUserPbxIds: MutableList<Any> = mutableListOf()
                 val data = if (clazz.simpleName == "PhoneUser") {
-                    finalizer["pbxList"]?.toString()?.let {
+                    finalizer["pbx.list"]?.toString()?.let {
                         it.split(";").forEach { fi ->
                             findId(Pbx::class.java, fi)?.let { id -> phoneUserPbxIds.add(id) }
                         }
@@ -246,49 +263,70 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
                         }
                     }
                     val finalMap = finalizer.filterNot { fi ->
-                        listOf("pbxList", "pbx_id", "extension", "pin").any { a -> a == fi.key }
+                        listOf("pbx.list", "pbx_id", "extension", "pin").any { a -> a == fi.key }
                     } as MutableMap<String, Any?>
-                    apiService.create(clazz, finalMap)
+                    queryNativeService.insertDataWithNativeQuery(clazz, finalMap)
+//                    apiService.create(clazz, finalMap)
                 } else {
-                    apiService.create(clazz, finalizer)
+//                    apiService.create(clazz, finalizer)
+                    queryNativeService.insertDataWithNativeQuery(clazz, finalizer)
                 }
 
                 // create budget
                 if (BudgetUserType.entries.any { a -> a.name.snakeToCamel() == clazz.simpleName.camelCase() }) {
-                    val id = clazz.collectAllField().findId().value(data)
-                    val res = apiService.findById(clazz, id!!, listOf("uid"))
-                    res?.get("uid")?.toString()?.let { uid ->
-                        apiService.create(
-                            Budget::class.java,
-                            mutableMapOf(
-                                "userType" to BudgetUserType.valueOf(
-                                    clazz.simpleName.camelCase().camelToSnake().uppercase()
-                                ),
-                                "userUid" to uid,
-                                "type" to "FLAT"
-                            ),
+//                    val id = clazz.collectAllField().findId().value(data)
+//                    val res = apiService.findById(clazz, id!!, listOf("uid"))
+//                    res?.get("uid")?.toString()?.let { uid ->
+//                    apiService.create(
+//                        Budget::class.java,
+//                        mutableMapOf(
+//                            "userType" to BudgetUserType.valueOf(
+//                                clazz.simpleName.camelCase().camelToSnake().uppercase()
+//                            ),
+//                            "userUid" to data,
+//                            "type" to "FLAT"
+//                        ),
+//                    )
+
+                    queryNativeService.insertDataWithNativeQuery(
+                        Budget::class.java, mutableMapOf(
+                            "userType" to BudgetUserType.valueOf(
+                                clazz.simpleName.camelCase().camelToSnake().uppercase()
+                            ).ordinal,
+                            "userUid" to data,
+                            "type" to BudgetType.FLAT.ordinal
                         )
-
-                        // create phoneUserPbx
-                        if (phoneUserPbxIds.isNotEmpty()) {
-                            phoneUserPbxIds.forEach { pbxId ->
-                                apiService.create(
-                                    PhoneUserPbx::class.java, mutableMapOf(
-                                        "pin" to phoneUserPin,
-                                        "extension" to phoneUserExtension,
-                                        "pbx_id" to pbxId,
-                                        "phoneUser_id" to id,
-                                    )
+                    )
+
+                    // create phoneUserPbx
+                    if (phoneUserPbxIds.isNotEmpty()) {
+                        phoneUserPbxIds.forEach { pbxId ->
+                            queryNativeService.insertDataWithNativeQuery(
+                                PhoneUserPbx::class.java, mutableMapOf(
+                                    "pin" to phoneUserPin,
+                                    "extension" to phoneUserExtension,
+                                    "pbx_id" to pbxId,
+                                    "phoneUser_id" to data,
                                 )
-                            }
+                            )
+//                            apiService.create(
+//                                PhoneUserPbx::class.java, mutableMapOf(
+//                                    "pin" to phoneUserPin,
+//                                    "extension" to phoneUserExtension,
+//                                    "pbx_id" to pbxId,
+//                                    "phoneUser_id" to id,
+//                                )
+//                            )
                         }
                     }
+//                    }
                 }
             } catch (e: Exception) {
                 failed.add(map)
-                logger.error("failed insert data migration", e)
+//                logger.error("failed insert data migration", e)
             }
         }
+        return failed
     }
 
     private fun finalizeMap(className: String, map: MutableMap<String, Any?>): MutableMap<String, Any?> {
@@ -305,14 +343,25 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
         }
 
         map["direction"]?.toString()?.let {
-            map["direction"] = it.split("").mapNotNull { m ->
-                when (m.trim()) {
-                    "C" -> "INCOMING"
-                    "G" -> "OUTGOING"
-                    "I" -> "INTERNAL"
-                    else -> null
-                }
-            }.joinToString(";")
+            map["direction"] = if (className == "transaction") {
+                it.split("").mapNotNull { m ->
+                    when (m.trim()) {
+                        "C" -> Direction.INCOMING.ordinal //"C"
+                        "G" -> Direction.OUTGOING.ordinal //"G"
+                        "I" -> Direction.INTERNAL.ordinal //"I"
+                        else -> null
+                    }
+                }.joinToString(";")
+            } else {
+                it.split("").mapNotNull { m ->
+                    when (m.trim()) {
+                        "C" -> "C"
+                        "G" -> "G"
+                        "I" -> "I"
+                        else -> null
+                    }
+                }.joinToString(";")
+            }
         }
 
         map.filterNot { it.key == "id" || it.key == "structure" }.forEach { (t, u) ->
@@ -347,10 +396,10 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
             val to = mapFinalize["extTransferTo"]?.toString() ?: ""
             val from = mapFinalize["extTransferFrom"]?.toString() ?: ""
             mapFinalize["transferType"] = when {
-                to.isBlank() && from.isBlank() -> TransferType.DIRECT
-                to.isNotBlank() && from.isBlank() -> TransferType.TRANSFER_TO
-                to.isBlank() && from.isNotBlank() -> TransferType.TRANSFER_FROM
-                else -> TransferType.TRANSFER_FROM_AND_TRANSFER_TO
+                to.isBlank() && from.isBlank() -> TransferType.DIRECT.ordinal
+                to.isNotBlank() && from.isBlank() -> TransferType.TRANSFER_TO.ordinal
+                to.isBlank() && from.isNotBlank() -> TransferType.TRANSFER_FROM.ordinal
+                else -> TransferType.TRANSFER_FROM_AND_TRANSFER_TO.ordinal
             }
         }
 
@@ -368,9 +417,9 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
     private val defaultProfile: Any? by lazy {
         apiService.findListPage(
             Rights::class.java,
-            listOf("id"),
+            listOf("uid"),
             FilterData.filter("name", FilterData.FILTEROP.EQ, "Default")
-        ).firstOrNull()?.get("id")
+        ).firstOrNull()?.get("uid")
     }
 
     private fun toSnakeCase(input: String): String {
@@ -381,7 +430,7 @@ class MigrationEntity(val passwordEncoder: PasswordEncoder) {
 
     private fun <T : BaseEntity> findId(clazz: Class<T>, value: Any): String? {
         return try {
-            val query = "SELECT id FROM ${toSnakeCase(clazz.simpleName)} WHERE code = :code "
+            val query = "SELECT uid FROM ${toSnakeCase(clazz.simpleName)} WHERE code = :code "
             val id = apiService.em.createNativeQuery(query, String::class.java)
                 .setParameter("code", value)
                 .singleResult as String
@@ -432,4 +481,47 @@ class SecurityConfig {
     @Bean
     fun passwordEncoder(): PasswordEncoder = PasswordEncoderFactories.createDelegatingPasswordEncoder()
 
+}
+
+@Service
+@Transactional
+class QueryNativeService(val apiService: ApiService) {
+
+    fun <T> insertDataWithNativeQuery(clazz: Class<T>, map: MutableMap<String, Any?>): String? {
+        val uid = ULID.random()
+        val fields = mutableListOf("uid")
+        map.keys.forEach {
+            val t = it.replace("_id", "_uid")
+            fields.add(t)
+        }
+        val structure = map["parent_id"]?.toString()?.let {
+            fields.add("structure")
+            "${EntityUtility(apiService, Organization::class.java).parentStructure(it)}|$uid"
+        }
+
+        val tableName = when (clazz.simpleName.lowercase()) {
+            "phoneuserpbx" -> "phoneuserpbx"
+            "transaction" -> {
+                fields.remove("uid")
+                "calltransaction"
+            }
+
+            else -> clazz.simpleName.camelToSnake().lowercase()
+        }
+
+        val query = "INSERT INTO $tableName (${fields.joinToString() { it.camelToSnake() }}) " +
+                "VALUES (${fields.joinToString() { ":$it" }})"
+
+        val sqlNative = apiService.em.createNativeQuery(query)
+
+        if (clazz.simpleName.lowercase() != "transaction") {
+            sqlNative.setParameter("uid", uid)
+        }
+
+        structure?.let { sqlNative.setParameter("structure", structure) }
+        map.forEach { (t, u) -> sqlNative.setParameter(t.replace("_id", "_uid"), u) }
+        sqlNative.executeUpdate()
+        return uid
+    }
+
 }

+ 48 - 32
src/main/kotlin/com/datacomsolusindo/migration/MigrationService.kt

@@ -1,21 +1,31 @@
 package com.datacomsolusindo.migration
 
+import com.datacomsolusindo.cpx_shared_code.entity.BaseEntity
+import com.datacomsolusindo.cpx_shared_code.entity.PhoneUser
 import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.launch
 import org.springframework.beans.factory.config.YamlPropertiesFactoryBean
 import org.springframework.core.io.FileSystemResource
 import org.springframework.stereotype.Service
+import java.io.File
 import java.nio.file.Files
 import java.nio.file.Path
 import java.nio.file.StandardCopyOption
 import java.time.LocalDateTime
 import java.time.format.DateTimeFormatter
 import java.util.concurrent.Executors
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.time.measureTimedValue
 import kotlin.use
 
+data class PreparedMigration(
+    val clazz: Class<out BaseEntity>,
+    val fields: Map<String, String>,
+    val rootFile: File,
+    val historyFile: File?,
+    val groupFile: File?
+)
+
+
 @Service
 class MigrationService(val migrationEntity: MigrationEntity) {
 
@@ -24,7 +34,9 @@ class MigrationService(val migrationEntity: MigrationEntity) {
     private val MIGRATION = "_migration_"
     private val MIGRATION_CONFIG = "config/migration.yml"
 
-    private val executor = Executors.newSingleThreadExecutor()
+    private val executorPrepareData = Executors.newSingleThreadExecutor()
+    private val executorInsertData = Executors.newSingleThreadExecutor()
+    private val queueInsertData = LinkedBlockingQueue<Pair<Class<out BaseEntity>, List<MutableMap<String, Any?>>>>(20)
 
     fun isMigrationFile(path: Path): Boolean = path.fileName.toString().contains(MIGRATION)
 
@@ -67,7 +79,7 @@ class MigrationService(val migrationEntity: MigrationEntity) {
 
             logger.info("register migration data ${migrationFiles.size} module")
             migrationFiles.forEach { mi ->
-                executor.submit {
+                executorPrepareData.submit {
                     val filename = mi.fileName.toString()
                     val migrationTarget = filename.split("_")[2].replace(".txt", "")
                     val fields = loadYamlAsMap(migrationTarget)
@@ -85,12 +97,14 @@ class MigrationService(val migrationEntity: MigrationEntity) {
                             }?.let { addFile -> renameToTemp(addFile) }
                         } else null
                         migrationEntity.clazzEntity(migrationTarget)?.let { clazz ->
-                            migrationEntity.execute(
-                                clazz,
-                                fields,
-                                rootFile.toFile(),
-                                historyFile?.toFile(),
-                                groupFile?.toFile()
+                            queueInsertData.put(
+                                clazz to migrationEntity.execute(
+                                    clazz,
+                                    fields,
+                                    rootFile.toFile(),
+                                    historyFile?.toFile(),
+                                    groupFile?.toFile()
+                                )
                             )
                         }
                     }
@@ -99,6 +113,27 @@ class MigrationService(val migrationEntity: MigrationEntity) {
         }
     }
 
+    fun startInsertWorker() {
+
+        executorInsertData.submit {
+            while (true) {
+                val data = queueInsertData.take()
+                try {
+                    val process = measureTimedValue { migrationEntity.insertData(data.first, data.second) }
+                    logger.info(
+                        "finished process migration ${data.first.simpleName} " +
+                                "data ${data.second.size} " +
+                                "success ${data.second.size - process.value.size} " +
+                                "failed ${process.value.size} " +
+                                "takes time ${process.duration.inWholeMilliseconds}ms"
+                    )
+                } catch (ex: Exception) {
+                    logger.error("Insert failed", ex)
+                }
+            }
+        }
+    }
+
     fun loadYamlAsMap(prefix: String): Map<String, String> {
         val factory = YamlPropertiesFactoryBean()
         factory.setResources(FileSystemResource(MIGRATION_CONFIG))
@@ -111,23 +146,4 @@ class MigrationService(val migrationEntity: MigrationEntity) {
             .mapValues { it.value.toString() }
     }
 
-}
-
-//object TaskQueue {
-//    private val scope = CoroutineScope(Dispatchers.IO)
-//    private val channel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
-//
-//    init {
-//        scope.launch {
-//            for (task in channel) {
-//                task()
-//            }
-//        }
-//    }
-//
-//    fun submit(task: suspend () -> Unit) {
-//        scope.launch {
-//            channel.send(task)
-//        }
-//    }
-//}
+}