package com.datacomsolusindo.migration.service import com.datacomsolusindo.cpx_shared_code.utility.CacheData import com.datacomsolusindo.cpx_shared_code.utility.SimpleLogger import com.datacomsolusindo.migration.MigrationEntity import com.datacomsolusindo.migration.data.FinalizedDataService import com.datacomsolusindo.migration.data.InsertDataService import com.datacomsolusindo.migration.model.MigrationFileData import com.datacomsolusindo.migration.queueInsertData import org.springframework.stereotype.Service import java.util.concurrent.Executors import java.util.concurrent.LinkedBlockingQueue import kotlin.io.path.name import kotlin.system.exitProcess import kotlin.time.measureTimedValue val executorPrepareData = Executors.newSingleThreadExecutor() val executorInsertData = Executors.newSingleThreadExecutor() val queuePrepareMigrationData = LinkedBlockingQueue(100) val queueInsertMigrationData = LinkedBlockingQueue>>>(100) @Service class WorkerMigrationService( private val migrationEntity: MigrationEntity, private val finalizedDataService: FinalizedDataService, private val insertDataService: InsertDataService ) { private val logger = SimpleLogger.getLogger(this::class.java) fun startInsertWorker() { executorInsertData.submit { while (true) { val data = queueInsertData.take() try { val process = measureTimedValue { migrationEntity.insertData(data.first, data.second) } logger.info( "add data migration ${data.first.simpleName} " + "data ${data.second.size} " + "success ${data.second.size - process.value.size} " + "failed ${process.value.size} " + "takes time ${process.duration.inWholeMilliseconds}ms" ) CacheData.update(data.first.simpleName, false) } catch (ex: Exception) { logger.error("Insert failed", ex) } } } } fun queueProcessingInsertDataMigration() { executorInsertData.submit { while (true) { val migrationData = queueInsertMigrationData.take() val attribute = migrationData.first if (attribute.table == "calltransaction") { // insertDataService.createTableCallTransactionTemporary() insertDataService.updateRawData() finalizedDataService.prepareDataCallTransaction() } val data = migrationData.second // .take(1) try { val chunkData = data.chunked(500) logger.info("migration table ${attribute.table} with total data ${data.size} and chunk data ${chunkData.size}") chunkData.forEach { var success = 0 val process = measureTimedValue { it.let { dt -> if (dt.first().keys.any { a -> a == "structure" }) { dt.sortedBy { it["structure"].toString().length } } else dt }.forEach { ch -> val isSuccess = finalizedDataService.buildToInsertData( attribute.table, ch, attribute.unique ) success += isSuccess } } logger.info( "add data migration ${attribute.table} " + "data ${it.size} " + "success $success " + "failed ${it.size - success} " + "takes time ${process.duration.inWholeMilliseconds}ms" ) } //println("MIGRATION DATA END---") runningTableMigration.remove(attribute.migrationFile.name) if (runningTableMigration.isEmpty()) { exitProcess(1) } } catch (ex: Exception) { logger.error("Insert failed", ex) } } } } }