Resource Constraint Project Scheduling Problem

問題設定

本章ではジョブショップスケジューリング問題 (JSSP) を一般化した、資源制約付きプロジェクトスケジューリング問題の定式化について扱います。 英語の頭文字をとり RCPSP と呼ぶことにします。

RCPSP は JSSP の Machine の概念を一般し、 Resource という概念を導入します。 Job は複数種類の Resource を適当な個数使用することで処理されます。 RCPSP は以下のように定義されます。

  1. m種類の Resource \(R_1, R_2, \dots, R_m\) にはそれぞれ、使用可能個数 \(A_1, A_2, \dots, A_m\) (Availabily) が定まっている。

  2. n種類の Job \(J_1, J_2, \dots\ J_n\) にはそれぞれ、処理時間 \(D_1, \dots, D_n\) と要求する資源の個数 (Request) が定まっている。

    • Job \(J_i\) は Resource \(R_j\)\(N_{i, j}\) 個要求する。

  3. (順序制約) Job には順序関係 (Precedence relations) が定まっている。

    • Job \(J_i\) は Job \(J_{p_i(1)}, J_{p_i(2)}, \dots, J_{p_i(n_i)}\) の処理が完了してからしか処理を開始できない。

  4. (資源制約) 同時に処理する Job の要求する資源の個数はその資源の使用可能個数以下でなければならない。

  5. このとき、与えられた評価尺度を最適にするような Job の開始時刻を求める。

    • 全ての Job が完了する時刻の最小化問題を解くことが多い。

amplify_sched を用いた実装

amplify_sched で RCPSP を解きます。 PSPLIB から RCPSP の問題をダウンロードします。

PSPLIB: https://www.om-db.wi.tum.de/psplib/main.html

%%bash
# Download rcpsp j30 data from PSPLIB.
wget -q https://www.om-db.wi.tum.de/psplib/files/j30.sm.zip
mkdir -p rcpsp/j30
unzip -q -n j30.sm.zip -d rcpsp/j30

PSPLIB のパーサを実装します。パーサの実装は定式化とあまり関係がありませんので、スキップしても問題ありません。

from collections import namedtuple
from dataclasses import dataclass


@dataclass
class RCPSP:
    Pred = namedtuple("Pred", ["jobnr", "num_modes", "num_successors", "successors"])
    ReqDur = namedtuple("ReqDur", ["jobnr", "mode", "duration", "request"])
    Job = namedtuple("Job", ["num_modes", "successors", "durations", "requests"])

    # Base data
    base_data: str
    initial_value_random_generator: int
    # Overview
    projects: int
    jobs: int
    horizon: int
    renewable: int
    nonrenewable: int
    doubly_constrained: int
    # Project information
    pronr: int
    num_jobs: int
    rel_date: int
    due_date: int
    tard_cost: int
    mpm_time: int
    # Precedence relations
    pred_list: list
    # Request/Durations
    req_dur_list: list
    # Resource availabilities
    avail_list: list

    def get_job(self, jobnr: int) -> Job:
        for pred in self.pred_list:
            if pred.jobnr == jobnr:
                num_modes = pred.num_modes
                successors = pred.successors
                break
        durations = []
        requests = []
        for req_dur in self.req_dur_list:
            if req_dur.jobnr == jobnr:
                # Ignore mode
                durations.append(req_dur.duration)
                requests.append(req_dur.request)
        return RCPSP.Job(num_modes, successors, durations, requests)

    def get_jobs(self) -> dict:
        ret = dict()
        for i in range(1, 1 + self.jobs):
            ret[i] = self.get_job(i)
        return ret


def parse_rcpsp_file(file_path) -> RCPSP:
    with open(file_path) as f:
        d = f.readlines()
    for i, line in enumerate(d):
        if line.startswith("file with basedata            : "):
            base_data = line[32:]
        if line.startswith("initial value random generator: "):
            initial_value_random_generator = int(line[32:])
        if line.startswith("projects                      : "):
            projects = int(line[32:])
        if line.startswith("jobs (incl. supersource/sink ): "):
            jobs = int(line[32:])
        if line.startswith("horizon                       : "):
            horizon = int(line[32:])
        if line.startswith("  - renewable                 : "):
            renewable = int(line[32:].split()[0])
        if line.startswith("  - nonrenewable              : "):
            nonrenewable = int(line[32:].split()[0])
        if line.startswith("  - doubly constrained        : "):
            doubly_constrained = int(line[32:].split()[0])
        if line.startswith("PROJECT INFORMATION:"):
            pronr, num_jobs, rel_date, due_date, tard_cost, mpm_time = [int(x) for x in d[i + 2].split()]
        if line.startswith("PRECEDENCE RELATIONS:"):
            pred_list = list()
            sum_modes = 0
            for j in range(i + 2, i + 2 + jobs):
                tmp = d[j].split()
                jobnr = int(tmp[0])
                num_modes = int(tmp[1])
                sum_modes += num_modes
                num_successors = int(tmp[2])
                successors = list()
                if num_successors > 0:
                    successors = [int(x) for x in tmp[3:]]
                pred_list.append(RCPSP.Pred(jobnr, num_modes, num_successors, successors))
        if line.startswith("REQUESTS/DURATIONS:"):
            req_dur_list = list()
            for j in range(i + 3, i + 3 + sum_modes):
                tmp = d[j][:9].split()
                if len(tmp) > 0:
                    jobnr = int(tmp[0])
                tmp = d[j][9:].split()
                mode = int(tmp[0])
                duration = int(tmp[1])
                request = [int(x) for x in tmp[2:]]
                req_dur_list.append(RCPSP.ReqDur(jobnr, mode, duration, request))
        if line.startswith("RESOURCEAVAILABILITIES:"):
            avail_list = [int(x) for x in d[i + 2].split()]
    return RCPSP(
        base_data,
        initial_value_random_generator,
        projects,
        jobs,
        horizon,
        renewable,
        nonrenewable,
        doubly_constrained,
        pronr,
        num_jobs,
        rel_date,
        due_date,
        tard_cost,
        mpm_time,
        pred_list,
        req_dur_list,
        avail_list,
    )
# Parse file
file_path = "rcpsp/j30/j3010_1.sm"
problem = parse_rcpsp_file(file_path)

amplify_sched で RCPSP を定式化します。

amplify_sched は Job を何らかの Machine で実行することを前提としています。 RCPSP には Machine という概念はありませんので、仮想的に Jobの数だけ Machine を用意し、各 Job は対応した Machine で処理されることとします。 Machine は対応する一つの Job だけ処理します。

amplify_sched を用いると資源制約は capacity/required_resources 制約として、順序制約は dependent_jobs 制約として実装できます。

from amplify_sched import *

num_jobs = problem.jobs
num_machines = num_jobs
num_resources = problem.renewable
avail_list = problem.avail_list
jobs = problem.get_jobs()

model = amplify_sched.Model()
# Machine を仮想的に Job の数だけ追加する
for i in range(num_machines):
    model.machines.add(str(i + 1))
# Job を追加し、処理時間を設定する
for i in range(num_jobs):
    job_id = str(i + 1)
    machine_id = job_id
    model.jobs.add(job_id)
    model.jobs[job_id].append()
    assert jobs[int(job_id)].num_modes == 1
    model.jobs[job_id][0].processing_times[machine_id] = jobs[int(job_id)].durations[0]
# Resource を追加し、使用可能個数を設定する
for i in range(num_resources):
    model.resources.add(str(i + 1))
    model.resources[str(i + 1)].capacity = avail_list[i]
# 資源制約
for job_id, job in jobs.items():
    for i, request in enumerate(job.requests[0]):
        # Resource を request 回追加する
        resource_id = str(i + 1)
        for _ in range(request):
            model.jobs[str(job_id)][0].required_resources.append(resource_id)
# 順序制約
for job_id, job in jobs.items():
    for successor in job.successors:
        model.jobs[str(successor)].add_dependent_jobs(str(job_id))

上で定式化モデルをエンジンで解くと、1秒で最適解に到達することを確認できます。

token = "xxxxxxxxxxxxxxxxxxxxxxxxx"
gantt = model.solve(token=token, timeout=1)
gantt.status
'OPTIMAL'

ガントチャートを表示すると、各 Machine が一つの Job を処理していることを確認できます。

gantt.timeline(machine_view=True)

amplify_sched は Resource の使用量も簡単にプロットできます。

gantt.resource()

avail_option を使用すると、余っている Resource の量 (Capacity - Requirement のこと) もプロットできます。

ほとんどすべての時間帯で何らかの Resource の余っている量が 0 付近の値を取っており、効率的にスケジューリング出来ていることが確認できます。

gantt.resource(avail_view=True)

Multi Mode RCPSP

RCPSP の拡張として Multi Mode RCPSP があります。 Multi Mode RCPSP では Job には複数の処理モードがあり、それぞれの処理モードには処理時間と要求する資源の個数が異なります。

amplify_sched では Multi Mode RCPSP もサポートしています。

定式化は RCPSP とほとんど同じですが、各Jobには複数の処理モードがあるため、Machine を仮想的に Job とそのモードの数だけ用意し、各Jobは選択されたモードに対応したMachineで処理されることとします。

%%bash
# Download rcpsp j30 data from PSPLIB.
wget -q https://www.om-db.wi.tum.de/psplib/files/n0.mm.zip
mkdir -p rcpsp/n0
unzip -q -n n0.mm.zip -d rcpsp/n0
# Parse file
file_path = "rcpsp/n0/n048_10.mm"
problem = parse_rcpsp_file(file_path)
from amplify_sched import *

num_jobs = problem.jobs
num_resources = problem.renewable
avail_list = problem.avail_list
jobs = problem.get_jobs()

model = amplify_sched.Model()
# Machine を仮想的に Job とそのモードの数だけ追加する
for i in range(num_jobs):
    job_id = str(i + 1)
    for mode in range(jobs[int(job_id)].num_modes):
        machine_id = f"Job {job_id}: Mode {str(mode + 1)}"
        model.machines.add(machine_id)
# Job を追加し、処理時間を設定する
for i in range(num_jobs):
    job_id = str(i + 1)
    model.jobs.add(job_id)
    model.jobs[job_id].append()
    for mode in range(jobs[int(job_id)].num_modes):
        machine_id = f"Job {job_id}: Mode {str(mode + 1)}"
        model.jobs[job_id][0].processing_times[machine_id] = jobs[int(job_id)].durations[mode]
# Resource を追加し、使用可能個数を設定する
for i in range(num_resources):
    model.resources.add(str(i + 1))
    model.resources[str(i + 1)].capacity = avail_list[i]
# 資源制約
for job_id, job in jobs.items():
    for mode in range(job.num_modes):
        machine_id = f"Job {job_id}: Mode {str(mode + 1)}"
        for i, request in enumerate(job.requests[mode]):
            # Resource を request 回追加する
            resource_id = str(i + 1)
            for _ in range(request):
                model.jobs[str(job_id)][0].required_resources.append((resource_id, machine_id))
# 順序制約
for job_id, job in jobs.items():
    for successor in job.successors:
        model.jobs[str(successor)].add_dependent_jobs(str(job_id))
token = "xxxxxxxxxxxxxxxxxxxxxxxxx"
gantt = model.solve(token=token, timeout=1)
gantt.status
'OPTIMAL'
gantt.timeline(machine_view=True)
gantt.resource()
gantt.resource(avail_view=True)