[Java log] P3: Logging asynchronously to the database using ExecutorService

Articles published from blog Tung Huynh, was the consent of the author.

Hello everyone, in 2 previously on logging
[Java log] P1: The importance of logging in software development
[Java log] P2: Log4j integrated into the software
himself mentioned the importance of logging, some principles when logging, and instructions on how to integrate Log4j in a java program.
This article I will guide you to create a feature logging asynchronously at database.
For logging into the database features are Log4j fully capable to meet this available. But in this section I will guide how to create a module logging into the database so that you can better understand logging mechanism asynchronous.
First, to log on database there should be table save log, and create a sequence to get the value for the ID of the log table

CREATE TABLE LOGS
( 
  LOGS_ID               NUMBER NOT NULL PRIMARY KEY,
  LEVEL_LOG             VARCHAR2(10),
  CREATE_TIME           DATE,
  CONTENT               CLOB
);
/
CREATE SEQUENCE LOGS_SEQ;

On application, I will create a entity corresponding to table was created to facilitate the insert Data Sheet log.

package net.tunghuynh.logging.core;
 
public class Logs {
    private Integer logsId;
    private String levelLog;
    private String content;
 
    public Integer getLogsId(){
        return logsId;
    }
    
    public void setLogsId(Integer logsId){
        this.logsId = logsId;
    }
    
    public String getLevelLog() {
        return levelLog;
    }
 
    public void setLevelLog(String levelLog) {
        this.levelLog = levelLog;
    }
 
    public String getContent() {
        return content;
    }
 
    public void setContent(String content) {
        this.content = content;
    }
}

If normal logging purposes only succeed, you absolutely can make a direct statement insert into log data into the table in the following way

public static void main(String[] args) {
    Logs logs = new Logs();
    logs.setLevelLog("INFO");
    logs.setContent("Log by insert into database");
    new Main().save(logs);
}
public void save(Logs item) {
    PreparedStatement stmt = null;
    Connection con = null;
    try {
        con = DatabaseUtils.getConnection();
        stmt = con.prepareStatement("INSERT INTO LOGS (LOGS_ID, LEVEL_LOG, CONTENT, CREATE_TIME)" +
                " VALUES(LOGS_SEQ.NEXTVAL, ?, ?, SYSDATE)");
        stmt.setString(1, item.getLevelLog());
        stmt.setString(2, item.getContent());
        stmt.execute();
    } catch (Exception ex) {
        logger.error(ex.getMessage(), ex);
    } finally {
        DatabaseUtils.closeObject(stmt);
        DatabaseUtils.closeObject(con);
    }
}

Thought so, I've split the code on the handle outside the scope of the article as database connection, closed connection,... at 1 Other class named DatabaseUtils to avoid diluting the article. You can see the details of that part of source full at the end!.
But like his previous post mentioned principles when logging is not affect the time treatment of primary services. So have committed code on this principle, because it can work connect database and execute command insert the other will occupy 1 certain part time.
To fix this we need to handle the database and execute connect insert in 1 thread private. The simplest is new Thread New to execute Runnable.

Thread saveLog = new Thread(new Runnable() {
    @Override
    public void run() {
        new Main().save(item);
    }
});
saveLog.start();

But this way will encounter a problem fairly danger that is when the business function is called continuous, resulted in the need to create too more Thread in apps not liberated get, This will affect a lot to performance the application and impact of both the server. Therefore you should use a form pool to manage and limit birth Thread bluff above. Here, I usually use ExecutorService to undertake this work, when or anywhere needed, just log submit login and racking for boys ExecutorService handle logging gradually, so will not affect the processing time of the primary services.
First I created a abstract class Task to management list of objects log necessary save at database, abstract class Task get implements Callable to execute major.

package net.tunghuynh.logging.core;
 
import java.util.List;
import java.util.concurrent.Callable;
 
public abstract class Task<E> implements Callable<Integer> {
    private List<E> items;
    public void setItems(List<E> items){
        this.items = items;
    }
    public List<E> getItems(){
        return items;
    }
}

Then I created a abstract class ThreadManager to declare and define the framework for the process of logging.

package net.tunghuynh.logging.core;
 
import org.apache.logging.log4j.Logger;
 
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
 
public abstract class ThreadManager {
    public final int BATCH_SIZE = 10;//Chương trình sẽ thực hiện doProcess khi hàng đợi vượt quá 10 phần tử
    public final long WAIT_TIME_OUT = 1000; //ms
    public final long TIME_OUT = 2 * 1000; //ms. Chương trình sẽ thực hiện doProcess với chu kỳ 2 giây 
    private final Logger logger = org.apache.logging.log4j.LogManager.getLogger(ThreadManager.class);
    private final BlockingQueue sourceQueue = new LinkedBlockingQueue();
    protected ArrayList items = new ArrayList(BATCH_SIZE);
    protected AtomicBoolean shouldWork = new AtomicBoolean(true);
    protected AtomicBoolean isRunning = new AtomicBoolean(true);
    private boolean listening = false;
    private String name = "DB LOGGER";
    protected ExecutorService executorService = Executors.newFixedThreadPool(5);
    private Thread mainThread;
 
    public ThreadManager() {
        logger.debug("Start task manager named: " + name);
        mainThread = new Thread(new Runnable() {
            @Override
            public void run() {
                logger.info("Queued job manager " + name + " is running and watching for queue... ");
                isRunning.set(true);
                int recNum = 0;
                long lgnStart = System.currentTimeMillis();
                while (shouldWork.get()) {
                    try {
                        Object item = sourceQueue.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
                        if (item != null) {
                            items.add(item);
                            recNum++;
                        }
 
                        if (recNum >= BATCH_SIZE || timedOut(lgnStart)) {
                            if (items.size() > 0) {
                                logger.info(String.format("Thread %s: %s submits %d item(s)",
                                        Thread.currentThread().getName(), name, items.size()));
                                doProcess(items);
                                items = new ArrayList(BATCH_SIZE);
                                lgnStart = System.currentTimeMillis();
                                recNum = 0;
                            }
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage());
                    }
                    isRunning.set(false);
                }
                logger.info("Taskmanager " + name + " is stopped!!");
            }
 
            private boolean timedOut(Long startTime) {
                return System.currentTimeMillis() - startTime > TIME_OUT;
            }
        });
 
    }
 
    /**
     * abstract method xử lý nghiệp vụ
     * @param items
     */
    public abstract void doProcess(ArrayList items);
 
    /**
     * Bắt đầu lắng nghe dữ liệu cần xử lý từ hàng đợi
     */
    public synchronized void listen() {
        if (!listening) {
            mainThread.start();
            listening = true;
        }
    }
 
    public BlockingQueue getSourceQueue() {
        return sourceQueue;
    }
 
    public void stop() {
        logger.info(String.format("%s received a termination signal, stopping ... ", name));
        this.shouldWork.set(false);
        int tryTime = 0;
        while (isRunning.get() &amp;&amp; tryTime < 50) {
            try {
                Thread.currentThread().sleep(50L);
            } catch (Exception ex) {
 
            }
            tryTime++;
        }
    }
 
    /**
     * Submit một đối tượng cần xử lý vào hàng đợi
     * @param item
     */
    public void submit(Object item) {
        sourceQueue.offer(item);
    }
}

This section will be relatively complicated for you who have not been exposed much to handlemultithreading inJava Core. I would describe the past as follows:
-BlockingQueue sourceQueue: queue up objectslog necessaryinsert, outside adds an element to the queue via methodsubmit()
-ExecutorService executorService: Service implementation, has been fixed configuration only allows creation5 Threadat the same time, despite being called several times, only the maximum 5 thread is created, avoid creating too much influence threadperformance server
-ArrayList items: list to save the object is removed from the queuesourceQueue givedoProcess toinsert atdatabase
-Stream Processor: here you have 1thread was created andstart 1 the only time in the methodlisten(), This thread will exist during the running of applications. Inside handle the continuous threadread data from the queuesourceQueue to add to the listitems. The important thing here is to satisfy 1 in 2 condition: listitems exceedBATCH_SIZE or the elements are stored in a list of items tooTIME_OUT would pushitems goinsert. Conditions toavoid the insert too muchrecords 1 at, andavoidstoring data in fulltoo long without insert. After sendingdoProcess mustclearlistitems andreset variable timertimeout. You can see the photo below to visualize easier.

After creating 2 abstract general above, I will create the class handling logging.
The first is class LogThread get extends Task, task insert 1 List Subjects Logs at database. This section is similar to the above example but the other is insert 1 only list.

package net.tunghuynh.logging.core;
 
import net.tunghuynh.preparestatement.DatabaseUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
 
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
 
public class LogThread extends Task {
    private final Logger logger = LogManager.getLogger(LogThread.class);
 
    @Override
    public Integer call() throws Exception {
        List lstLog = getItems();
        try {
            if (lstLog != null &amp;&amp; !lstLog.isEmpty()) {
                save(lstLog);
            }
        } catch (Exception e) {
            logger.error(e.toString(), e);
            return 0;
        }
        return 1;
    }
 
    public void save(List<Logs> lst) {
        PreparedStatement stmt = null;
        Connection con = null;
        try {
            con = DatabaseUtils.getConnection();
            stmt = con.prepareStatement("INSERT INTO AD_SCHEDULE_LOG " +
                    " (LOGS_ID, LEVEL_LOG, CONTENT, CREATE_TIME)" +
                    " VALUES(LOGS_SEQ.NEXTVAL, ?, ?, SYSDATE)");
            for(Logs item : lst){
                stmt.setString(1, item.getLevelLog());
                stmt.setString(2, item.getContent());
                stmt.execute();
            }
        } catch (Exception ex) {
            logger.error(ex.getMessage(), ex);
        } finally {
            DatabaseUtils.closeObject(stmt);
            DatabaseUtils.closeObject(con);
        }
    }
}

Next I will create class LogManager get extends ThreadManager, mission is to get the list of objects to keep logs from queue in ThreadManager to submit at ExecutorService.
This passage may have 1 Some wonder why you do not use insert theo lô. Because in ThreadManager Read from the queue out, and a maximum of only 10 element is pushed into the list to doProcess, means essentially is batch and do not need treatment here anymore.

package net.tunghuynh.logging.core;
 
import java.util.ArrayList;
 
public class LogManager extends ThreadManager {
    @Override
    public void doProcess(ArrayList items) {
        LogThread logThread = new LogThread();
        logThread.setItems(items);
        executorService.submit(logThread);
    }
}

Come here to make you well imagine, when incurred other types of logs to keep on other table, or send qua FTP,... or any other task necessary for handling asynchronous it can be reused abstract class ThreadManager framework, just redefine other similar class LogThread to handle major business and LogManager to read data from the queue. So you have a basic process framework.
Finally, to use the asynchronous log into the database created above, I write 1 segment testin Main.java

package net.tunghuynh.logging;
 
import net.tunghuynh.logging.core.Logs;
import net.tunghuynh.preparestatement.DatabaseUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
 
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
public class Main {
    static Logger logger = LogManager.getLogger(Main.class);
    public static void main(String[] args) {
        //Tạo đối tượng test để ghi log
        Logs logs = new Logs();
        logs.setLevelLog("INFO");
        logs.setContent("Content");
        long start = new Date().getTime();
        //Khởi tạo LogManager để ghi log bất đồng bộ
        net.tunghuynh.logging.core.LogManager logManager = new net.tunghuynh.logging.core.LogManager();
        logManager.listen();
        //Submit vào hàng đợi
        logManager.submit(logs);
        logger.info("Async: " + (new Date().getTime()-start) + "ms");
        start = new Date().getTime();
        //Ghi log không dùng thread
        new Main().save(logs);
        logger.info("No-async: " + (new Date().getTime()-start) + "ms");
    }
    public void save(Logs item) {
        PreparedStatement stmt = null;
        Connection con = null;
        try {
            con = DatabaseUtils.getConnection();
            stmt = con.prepareStatement("INSERT INTO LOGS (LOGS_ID, LEVEL_LOG, CONTENT, CREATE_TIME)" +
                    " VALUES(LOGS_SEQ.NEXTVAL, ?, ?, SYSDATE)");
            stmt.setString(1, item.getLevelLog());
            stmt.setString(2, item.getContent());
            stmt.execute();
        } catch (Exception ex) {
            logger.error(ex.getMessage(), ex);
        } finally {
            DatabaseUtils.closeObject(stmt);
            DatabaseUtils.closeObject(con);
        }
    }
}

In this test period alone combines writing do not use thread Initial and writing asynchronous New construction for measuring the processing time
Try running and see results

2019-07-12 16:34:58 PM INFO  [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Queued job manager ACTION/API LOGGER is running and watching for queue... 
2019-07-12 16:34:58 PM INFO  [main] net.tunghuynh.logging.Main.main : Async: 4ms
2019-07-12 16:35:00 PM INFO  [main] net.tunghuynh.logging.Main.main : No-async: 1898ms
2019-07-12 16:35:00 PM INFO  [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Thread Thread-1: ACTION/API LOGGER submits 1 item(s)
2019-07-12 16:35:29 PM INFO  [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Queued job manager ACTION/API LOGGER is running and watching for queue... 
2019-07-12 16:35:29 PM INFO  [main] net.tunghuynh.logging.Main.main : Async: 7ms
2019-07-12 16:35:31 PM INFO  [main] net.tunghuynh.logging.Main.main : No-async: 1519ms
2019-07-12 16:35:31 PM INFO  [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Thread Thread-1: ACTION/API LOGGER submits 1 item(s)

Look we see the results different very big. With logging asynchronous the processing time only below 10 Milliseconds, because something has been running it for the other guys handle it. Still insert common they must take 1.5 to 2 seconds (longer than 500 time), very significant to use.
At the test on himself just placed into the main function to test for easy, you can init the ThreadManagerin 1 grade level static to use always function submit without having to reset multiple times, or if you use Spring should create 1 Bean give LogManager with constructor was listen() and destroy was stop() to use

@Bean(name = "logManager", initMethod = "listen", destroyMethod = "stop")
public LogManager getLogManager() {
    return new LogManager();
}

When needed Bean logManager Where this is just Autowired as always Bean Other possible uses are.

@Autowired
LogManager logManager;

To this demo, then you have created a module logging asynchronously at database adequate and safe. You can fully complement the features of logging by calling Save log API other, send log sang server other through FTP,.... instead write to database, depending on the intended use without affecting the main application.
I wish you success

Download code LogDBExample.zip