Home

Java Multithread Queue Consumer

Mari berdiskusi bersama kami di Group Facebook Kurung Kurawal

Post kali ini, saya mau membahas mengenai penggunaan Queue di dalam Java. Queue, secara sederhana dapat diartikan sebagai antrian. Sebuah aplikasi yang memberikan servis, pasti akan memiliki banyak sekali hal/pekerjaan yang harus dikerjakan.

Bayangkan misalnya, seorang koki di restoran akan menerima pesanan yang lebih dari 1 satu atau banyak. Antrian pesanan tersebut jika dikerjakan oleh 1 koki, tentu akan memakan waktu lama, tergantung seberapa lama 1 pesanan atau pekerjaan dapat diselesaikan. Namun jika dikerjakan 2 koki, waktu yang dibutuhkan akan berkurang, begitu pula jika 3 koki, dan seterusnya.

Namun, apakah dengan menggunakan banyak koki akan menyelesaikan masalah? Hati hati jika ini justru memberika masalah baru, yaitu kebutuhan resources dan ruang kerja akan semakin bertambah, apalagi jika ternyata antrian pekerjaan tidak begitu banyak, koki nya malah nganggur.

Analogi hal di atas terhadap pembahasan saya kali ini adalah, anggaplah antrian pesanan itu kita sebut (class) Job, koki adalah JobWorker yang merupakan Thread di Java, sedangkan antriannya sendiri kita sebut TheQueue.

MultiThreaded Queue Consumer

Pada tampilan aplikasi, saya mendesain dengan sangat sederhana, hanya ada ada 3 tombol untuk interaksi, yaitu kurangi JobWorker, tambahkan JobWorker, dan tambahkan Job.

Ketika Tambah JobWorker diklik, anggaplah kita menambah koki baru, dan sebaliknya, ketika Kurangi JobWorker diklik, kita mengurangi jumlah koki yang bekerja. Sedangkan tambahkan Job berarti kita membuat pesanan atau pekerjaan yang harus dikerjakan koki koki kita.

Dan untuk menghindari masalah kehabisan resources komputer, saya batasi jumlah JobWorker hanya maksimal 10 saja. Daripada nge-hang? Dan alur aplikasinya membuat pesanan dengan status queue, dikerjakan dengan status onprogress, dan selesai dengan status done. Itu saja.

Demo aplikasi dapat didownload di sini.

Job

Karena ini hanya demo, maka pekerjaan disini hanya bersifat dummy saja, artinya tidak ada yang dikerjakan, namun untuk mendapatkan efek dari waktu yang dibutuhkan maka saya memberikan Thread.sleep dengan durasi secara acak.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * Created with IntelliJ IDEA.
 * Author: Lee
 * Date: 6/24/2014
 * Time: 3:02 PM
 * To change this template use File | Settings | File Templates.
 */
public class Job {
    public final int name;
    private String status;
 
    // this is the time needed to finish one job
    // of course this is dummy as it is used for sleeping
    int maxTime = 450;
    int minTime = 200;
    int dummyWorkingTime = (int)(Math.random() * (maxTime - minTime)) + minTime;
 
    public Job(int n){
        name = n;
        status = "Queued";
    }
 
    private void refreshGuiTable(){
        Main.getGui().refreshJobList(Job.this);
    }
 
    public String jobName(){
        return "Job" + name;
    }
 
    public String getStatus(){
        return status;
    }
 
    public void done(JobWorker w){
        status = "Done by " + w.getName() + " on " + dummyWorkingTime + "ms";
        refreshGuiTable();
    }
 
    public int onProgress(JobWorker w){
        status = "On Progress by " + w.getName();
        refreshGuiTable();
 
        Main.safeSleep(dummyWorkingTime);
 
        return dummyWorkingTime;
    }
}

TheQueue

Untuk class ini, saya sebenarnya mengandalkan class LinkedBlockingQueue<...> milik Java. Dan saya tidak akan membahas kenapa menggunakan Queue yang ini dan bukan yang itu. Pada kondisi lapangan, pilihlah jenis Queue yang sesuai dengan kebutuhan, atau mungkin buat sendiri, dengan menggunakan Interface Queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import javax.swing.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
 
/**
 * Created with IntelliJ IDEA.
 * Author: Lee
 * Date: 6/24/2014
 * Time: 3:08 PM
 * To change this template use File | Settings | File Templates.
 */
public class TheQueue {
    private int WORKER_LIMIT = 10;
 
    private int counter = 1;
    private LinkedBlockingQueue<Job> queue = new LinkedBlockingQueue<Job>();
    private List<JobWorker> workers;
 
    public TheQueue(){
        workers = new ArrayList<JobWorker>();
    }
 
    public void addWorker(){
        if(workers.size() >= WORKER_LIMIT){
            JOptionPane.showMessageDialog(Main.getGui(), "Telah mencapai batas / is on its limit!");
            return;
        }
 
        JobWorker w = new JobWorker(counter++, this);
        workers.add(w);
        w.start();
        updateWorkerNum();
    }
 
    public void decreaseWorker(){
        if(workers.isEmpty()){
            JOptionPane.showMessageDialog(Main.getGui(), "habis / No more Workers");
            return;
        }
 
        JobWorker w = workers.get(workers.size() - 1);
        w.stopWorking();
        workers.remove(w);
 
        updateWorkerNum();
    }
 
    public void updateWorkerNum(){
        Main.getGui().setNumWorker(workers.size());
    }
 
    public void enqueue(Job job){
        queue.offer(job);
        Main.getGui().addRow(new String[]{
                job.name + "", job.jobName(), job.getStatus()
        });
    }
 
    public Job dequeue(){
        try {
            return queue.isEmpty() ? null : queue.take();
        } catch (Exception e){
            return null;
        }
    }
}

JobWorker

Class inilah yang bertanggung jawab menghabiskan seluruh pekerjaan dalam antrian. Class ini akan selalu melihat ke antrian, apakah ada atau tidak pesanan yang belum dikerjakan. Jika ada maka selesaikan, dan kembali mengambil antrian yang tersedia.

Namun jika tidak ada, maka akan menunggu dengan Main.safeSleep(75);, jangan bingung, safeSleep sebenarnya hanya menjalankan Thread.sleep.

Kenapa harus sleep ? Karena mengantuk. Oh bukan, jika sebuah proses didalam loop dilakukan tanpa berhenti, maka kemungkinan besar komputer anda akan nge-hang, karena tidak adanya proses Context Switching pada process, atau satu core processor anda terkunci hanya untuk mengerjakan satu loop. Bayangkan jika banyak loop.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
 * Created with IntelliJ IDEA.
 * Author: Lee
 * Date: 6/24/2014
 * Time: 3:14 PM
 * To change this template use File | Settings | File Templates.
 */
public class JobWorker extends Thread implements Runnable {
    private int id;
    private boolean working;
    private TheQueue queue;
    private Job currentJob;
 
    public JobWorker(int i, TheQueue q) {
        super();
        setName("Worker" + i);
        id = i;
        working = true;
        queue = q;
    }
 
    public void stopWorking(){
        working = false;
        Main.getGui().log(getName() + " STOP WORKING");
    }
 
    public void run(){
        Main.getGui().log(getName() + " START working...");
 
        int dummyWorkingTime;
        while(working){
            currentJob = queue.dequeue();
            if(currentJob == null){
                Main.safeSleep(75);
            } else {
                Main.getGui().log(getName() + " WORKING " + currentJob.jobName());
                dummyWorkingTime = currentJob.onProgress(this);
                currentJob.done(this);
 
                Main.getGui().log(getName() + " FINISHED " + currentJob.jobName() + " on " + dummyWorkingTime + "ms");
            }
        }
 
        Main.getGui().log(getName() + " QUIT the queue");
        queue.updateWorkerNum();
    }
}

Sebagai catatan, kali ini memang saya hanya memberikan code tutorial disisi engine atau core tutorial saja.

Demikian, semoga sedikit banyak ini dapat membantu yang ingin belajar Queue dan/atau Multithreading di Java.