Forecast Volumes Calculation (Python Implementation)

EDIT 4/26/2023 - You can now get Daily Volumes and Monthly Volumes directly from the API without having to use the following script.


For forecast segment calculation details, refer to the documentation here: Forecast segment calculation

The script below is used to generate daily or monthly forecast volumes from the forecast parameters pulled from the REST API.

Use the function forecast_params_to_volumes to generate the forecast volumes.

import numpy as np
from datetime import date
from collections import defaultdict
import json

DAYS_IN_YEAR = 365.25
BASE_DATE_NP = np.datetime64('1900-01-01')
PROB_SERIES = ['best', 'p10', 'p50', 'p90']


def pred_exp(t, t0, q0, D_exp):
    return q0 * np.exp(-D_exp * (t - t0))


def pred_arps(t, t0, q0, D, b):
    return q0 * np.power(1 + b * D * (t - t0), -1 / b)


def pred_linear(t, q0, t0, k):
    return k * (t - t0) + q0


def exp_D_eff_2_D(D_eff):
    D = -np.log(1 - D_eff) / DAYS_IN_YEAR
    return D


class SegmentParent(object):
    def __init__(self, segment=None):
        self.segment = segment


class ArpsSegment(SegmentParent):
    def predict(self, raw_t):
        t = np.array(raw_t)
        start_idx = self.segment['start_idx']
        b = self.segment['b']
        D = self.segment['D']
        q_start = self.segment['q_start']
        return pred_arps(t, start_idx, q_start, D, b)


ArpsincSegment = ArpsSegment


class ArpsModifiedSegment(SegmentParent):
    def predict(self, raw_t):
        t = np.array(raw_t)
        start_idx = self.segment['start_idx']
        sw_idx = self.segment['sw_idx']

        q_start = self.segment['q_start']
        b = self.segment['b']
        D = self.segment['D']

        q_sw = self.segment.get('q_sw')
        if not q_sw:
            q_sw = pred_arps(sw_idx, start_idx, q_start, D, b)

        D_exp = self.segment.get('D_exp')
        if not D_exp:
            D_exp = exp_D_eff_2_D(self.segment['realized_D_eff_sw'])

        ret = np.zeros(t.shape)
        range_1 = t <= sw_idx
        range_2 = t > sw_idx
        ret[range_1] = pred_arps(t[range_1], start_idx, q_start, D, b)
        ret[range_2] = pred_exp(t[range_2], sw_idx, q_sw, D_exp)
        return ret


class EmptySegment(SegmentParent):
    def predict(self, raw_t):
        t = np.array(raw_t)
        return np.zeros(t.shape)


class ExpDecSegment(SegmentParent):
    def predict(self, raw_t):
        t = np.array(raw_t)
        start_idx = self.segment['start_idx']
        q_start = self.segment['q_start']
        D = self.segment['D']
        return pred_exp(t, start_idx, q_start, D)


ExpIncSegment = ExpDecSegment


class FlatSegment(SegmentParent):
    def predict(self, raw_t):
        t = np.array(raw_t)
        q_start = self.segment['q_start']
        ret = np.ones(t.shape) * q_start
        return ret


class LinearSegment(SegmentParent):
    def predict(self, raw_t):
        t = np.array(raw_t)
        start_idx = self.segment['start_idx']
        q_start = self.segment['q_start']
        k = self.segment.get('k')
        if not k:
            end_idx = self.segment['end_idx']
            q_end = self.segment['q_end']
            k = (q_end - q_start) / (end_idx - start_idx)

        return pred_linear(t, q_start, start_idx, k)


class MultipleSegments(object):
    def __init__(self, segments=None):
        self.segment_parent = SegmentParent()
        if segments is not None:
            self.segments = [self.get_segment_object(seg) for seg in segments]

    def get_segment_object(self, segment):
        segment_dict = {
            'exp_inc': ExpIncSegment,
            'exp_dec': ExpDecSegment,
            'arps': ArpsSegment,
            'arps_inc': ArpsincSegment,
            'arps_modified': ArpsModifiedSegment,
            'flat': FlatSegment,
            'empty': EmptySegment,
            'linear': LinearSegment
        }
        return segment_dict[segment['name']](segment)

    def predict(self, raw_t, forecast_segments, to_fill=0):
        t = np.array(raw_t)
        ret = np.full(t.shape, to_fill, dtype=float)
        for seg in forecast_segments:
            this_segment_object = self.get_segment_object(seg)
            this_range = (t <= seg['end_idx']) & (t >= seg['start_idx'])
            ret[this_range] = this_segment_object.predict(t[this_range])

        return ret

    def predict_time_ratio(self, raw_t, ratio_t_segments, base_segment):
        base_pred = self.predict(raw_t, base_segment)
        ratio_pred = self.predict(raw_t, ratio_t_segments)
        return base_pred * ratio_pred


multi_seg = MultipleSegments()

name_map = {
    'diEffSec': 'D_eff',
    'diNominal': 'D',
    'endDate': 'end_idx',
    'qEnd': 'q_end',
    'qStart': 'q_start',
    'realizedDSwEffSec': 'realized_D_eff_sw',
    'segmentIndex': 'segment_idx',
    'segmentType': 'name',
    'startDate': 'start_idx',
    'swDate': 'sw_idx',
    'b': 'b',
}


def date_to_index(date_str: str):
    return (np.datetime64(date_str[:10]) - BASE_DATE_NP).astype(int)


def get_forecast_volumes(start_idx: int,
                         end_idx: int,
                         target_segments: dict,
                         base_segments: dict = None,
                         forecast_type: str = 'rate',
                         resolution: str = 'daily'):
    t = [i for i in range(start_idx, end_idx + 1)]
    index = t + BASE_DATE_NP
    if forecast_type == 'rate':
        volumes = multi_seg.predict(t, target_segments)
    else:
        volumes = multi_seg.predict_time_ratio(t, target_segments, base_segments)

    volume_type = 'daily_volumes'
    if resolution == 'monthly':
        volumes, index = sum_forecast_by_month(volumes, t)
        volume_type = 'monthly_volumes'

    return {'date': index, volume_type: volumes}


def sum_forecast_by_month(volume, daily_index):
    forecast_month = (BASE_DATE_NP + daily_index).astype('datetime64[M]')
    unique_forecast_month = np.unique(forecast_month)

    month_start = (unique_forecast_month.astype('datetime64[D]') - BASE_DATE_NP).astype(int)
    month_end = ((unique_forecast_month + 1).astype('datetime64[D]') - BASE_DATE_NP).astype(int) - 1

    month_start[0] = daily_index[0]
    month_end[-1] = daily_index[-1]

    month_end = month_end - month_start[0]
    month_start = month_start - month_start[0]

    cum_sum = np.cumsum(volume)
    cum_sum = np.concatenate([[0], cum_sum])

    grouped_volume = cum_sum[month_end + 1] - cum_sum[month_start]
    unique_forecast_month = (unique_forecast_month.astype('datetime64[D]') + 14).astype(date)

    return grouped_volume, unique_forecast_month


def format_api_segments(segments: dict):
    ret = []
    for segment in segments:
        s = {}
        for key in segment:
            if key in ['endDate', 'startDate', 'swDate']:
                s[name_map[key]] = date_to_index(segment[key])
            else:
                s[name_map[key]] = segment[key]

        ret.append(s)

    return ret


def calc_daily_volumes_from_parameters(forecast_parameters: defaultdict(dict), resolution: str = 'daily'):
    ret = defaultdict(lambda: defaultdict(dict))

    for well in forecast_parameters:
        well_parameters = forecast_parameters[well]
        for phase in well_parameters:
            if not well_parameters[phase]:
                ret[well][phase] = {}
                continue

            forecast_type = 'rate'
            if 'ratio' in well_parameters[phase]:
                forecast_type = 'ratio'

            if forecast_type == 'rate':
                for series in PROB_SERIES:
                    if series in well_parameters[phase]:
                        series_parameters = format_api_segments(well_parameters[phase][series]['segments'])
                        series_volumes = get_forecast_volumes(series_parameters[0]['start_idx'],
                                                              series_parameters[-1]['end_idx'], series_parameters, None,
                                                              'rate', resolution)
                        ret[well][phase][series] = series_volumes
            else:
                base_phase = well_parameters[phase].get('ratio', {}).get('basePhase')
                if not base_phase or not well_parameters.get(base_phase) or 'ratio' in well_parameters.get(base_phase):
                    ret[well][phase] = {}
                    continue
                base_phase_parameters = format_api_segments(well_parameters[base_phase].get('best',
                                                                                            {}).get('segments', {}))
                ratio_phase_parameters = format_api_segments(well_parameters[phase].get('ratio',
                                                                                        {}).get('segments', {}))
                ratio_volumes = get_forecast_volumes(ratio_phase_parameters[0]['start_idx'],
                                                     ratio_phase_parameters[-1]['end_idx'], ratio_phase_parameters,
                                                     base_phase_parameters, 'ratio', resolution)
                ret[well][phase]['ratio'] = ratio_volumes
                ret[well][phase]['base_phase'] = base_phase

    return ret


# this json data gets from get-forecast-outputs rest-api
def read_json_file_from_api(file_name: str):
    with open(file_name, 'r') as f:
        json_data = json.load(f)
    return json_data


'''
forecast_parameters_format = {'well_id':{

    'oil':[{},{},...],
    'gas':[{}],
    'water':[{}],
    }
}

'''


def format_json_data(json_data: dict):
    res = defaultdict(dict)
    for well_forecast in json_data:
        res[well_forecast['well']][well_forecast['phase']] = well_forecast

    return res


'''
Use the codes below to load and format JSON data from rest-api,
and then generate the daily volumes
'''


def forecast_params_to_volumes(file_path: str, resolution: str = 'daily'):

    # load and format json forecast parameters data
    forecast_params = format_json_data(read_json_file_from_api(file_path))

    # get forecast volumes from formatted data
    forecast_volumes = calc_daily_volumes_from_parameters(forecast_params, resolution)

    return forecast_volumes


'''
Examples to use this script:

monthly_volumes = forecast_params_to_volumes('your file path', resolution='monthly')
daily_volumes = forecast_params_to_volumes('your file path', resolution='daily')

'''
3 Likes

@carlos.pena not sure if you’re the correct person to ask but I saw your comment in the pagination document and was wondering if the forecast volumes calculation code could be added to the ComboCurve Client - Python?

@p.cui

For the moment we don’t have any plans to add this to our Python API client. We are trying to keep that client simple, with just the essential functionality for interacting with our API. I will discuss this with the development team currently working on the API, to see if we find enough value in this. For the moment you can use the code posted here by copying it into your own project. This code is directly based on the one we use internally to perform all the forecast calculations in CC, so it should work without issues.

Could you provide a sample response dictionary that works with this code? I’m trying to recreate a dictionary that mimics the API response for automation purposes with CC Sync.

You can find the structure and some examples of the json that would work with this code in the API documentation here. The file that’s expected in this code should contain the response from get-forecast-outputs; in other words, an array of ForecastOutput records.

And, of course, if you are planning to use dictionaries generated in some other way, instead of a json file, you will need to remove the part of the code that reads from a file, and everything else would work the same. Something like this:

def forecast_params_to_volumes(data: list[dict], resolution: str = 'daily'):

    # load and format json forecast parameters data
    forecast_params = format_json_data(data)

    # get forecast volumes from formatted data
    forecast_volumes = calc_daily_volumes_from_parameters(forecast_params, resolution)

    return forecast_volumes


'''
Examples to use this script:

monthly_volumes = forecast_params_to_volumes(your_list_of_dicts, resolution='monthly')
daily_volumes = forecast_params_to_volumes(your_list_of_dicts, resolution='daily')

'''

For the forecast volume calculation above the name_map in the sample originally posted needs to be updated to the following to include new parameters that we are using.

name_map = {
    'diEffSec': 'D_eff',
    'diNominal': 'D',
    'endDate': 'end_idx',
    'qEnd': 'q_end',
    'qStart': 'q_start',
    'realizedDSwEffSec': 'realized_D_eff_sw',
    'segmentIndex': 'segment_idx',
    'segmentType': 'name',
    'startDate': 'start_idx',
    'swDate': 'sw_idx',
    'b': 'b',
    'flatValue': 'c',
    'slope': 'k',
    'targetDSwEffSec': 'target_D_eff_sw',
}