Skip to content

Bedrock

Generates texts using AWS Bedrock APIs.

Note

Only supports AWS region US East 1!

BedrockClient

Bases: BatchClient

Source code in dactyl_generation/bedrock_generation.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class BedrockClient(BatchClient):
    def __init__(self, role_arn: str):
        """
        Constructor for BedrockClient.

        Args:
            role_arn: ARN of role to use.
        """
        super().__init__()
        self.role_arn = role_arn


    @staticmethod
    def prompt(messages:List[dict],  model: str, temperature: float, top_p: float, max_completion_tokens: int =512) -> str:
        """
        Prompt AWS Bedrock model with few shot learning examples.

        Args:
            messages: List of OpenAI messages
            model: name of model
            temperature: temperature parameter
            top_p: top p parameter
            max_completion_tokens: maximum number of tokens for completion

        Returns:
            response_content: string containing message content
        """

        response = completion(model, messages, temperature=temperature, top_p=top_p,max_completion_tokens=max_completion_tokens)
        return response.choices[0].message.content

    @staticmethod
    def format_llama_prompt(messages: List[dict]) -> str:
        """
        Formats OpenAI style message to Llama 3.2 style.

        Args:
            messages: list of dictionaries containing OpenAI style messages

        Returns:
            llama_prompt: formatted llama prompt
        """
        formatted_prompt = "<|begin_of_text|>"
        for message in messages:
            role =  message[ROLE]
            formatted_prompt += LLAMA_START_HEADER + role + LLAMA_END_HEADER + message[CONTENT] + "<|eot_id|>"
        formatted_prompt += f"{LLAMA_START_HEADER}assistant{LLAMA_END_HEADER}"
        return formatted_prompt

    @staticmethod
    def create_jsonl_input_for_llama(prompts_df: pd.DataFrame, s3_path: str) -> pd.DataFrame:
        """
        Creates a JSONL file to upload to S3.

        Args:
            prompts_df: prompt dataframe containing OpenAI style messages
            s3_path: Path to S3 bucket to save file

        Returns:
            None
        """
        original_prompts = prompts_df[PROMPT].to_list()
        prompts_df_copy = pd.DataFrame(prompts_df)
        prompts_df_copy[PROMPT] = prompts_df_copy[PROMPT].apply(lambda messages: BedrockClient.format_llama_prompt(messages))
        messages = prompts_df_copy.drop(columns=[RECORDID]).to_dict(orient="records")

        rows = list()
        for i in range(len(messages)):
            rows.append({
                RECORDID: prompts_df_copy[RECORDID].values[i],
                MODELINPUT:messages[i]
            }
            )
        input_frame = pd.DataFrame(rows)
        input_frame.to_json(s3_path, orient="records",index=False, lines=True)
        prompts_df_ret = pd.DataFrame(prompts_df)
        prompts_df_ret[RECORDID] = input_frame[RECORDID].to_list()
        prompts_df_ret[PROMPT] = original_prompts
        return prompts_df_ret


    def create_batch_job(self, prompts_df: pd.DataFrame, s3_input_path: str, s3_output_path: str, model: str,  job_name: str) -> dict:
        """
        Creates batch job for Bedrock Llama models.

        Args:
            prompts_df: Dataframe of OpenAI-style prompts.
            s3_input_path: Input data path.
            s3_output_path: Output data path.
            model: Bedrock model ID.
            job_name: Name of job

        Returns:
            jobArn: dictionary containing single string
        """
        inputted_frame = BedrockClient.create_jsonl_input_for_llama(prompts_df, s3_input_path)
        bedrock = boto3.client(service_name="bedrock",region_name="us-east-1")
        input_data_config = (
            {
                S3_INPUT_DATA_CONFIG: {
                    S3URI: s3_input_path
                }
            }
        )
        output_data_config = (
            {
                S3_OUTPUT_DATA_CONFIG:{
                    S3URI: s3_output_path
                }
            }
        )

        response = bedrock.create_model_invocation_job(
            roleArn=self.role_arn,
            modelId=model,
            jobName=job_name,
            inputDataConfig=input_data_config,
            outputDataConfig=output_data_config
        )
        inputted_frame[MODEL] = model
        return {
            JOB_ARN: response.get(JOB_ARN),
            S3_OUTPUT_DATA_CONFIG: s3_output_path,
            API_CALL: BEDROCK,
            JOB_NAME: job_name,
            INPUT_FILE: json.loads(inputted_frame.to_json(orient='records')),
            TIMESTAMP: str(datetime.now(timezone.utc))

        }


    def get_batch_job_output(self, file_path: str) -> pd.DataFrame:
        """
        Fetches batch job results given JSON file.

        Args:
            file_path: JSON file containing jobArn.

        Returns:
            output_df: Dataframe containing generations.
        """
        with open(file_path, 'r') as file:
            data = json.load(file)
        job_arn = data[JOB_ARN].split("/")[-1]
        s3_client = boto3.resource('s3')
        # ignore s3://
        bucket_name = data[S3_OUTPUT_DATA_CONFIG].split("/")[2]
        bucket = s3_client.Bucket(bucket_name)
        folder_path = "/".join(data[S3_OUTPUT_DATA_CONFIG].split("/")[3:]) + job_arn + "/"
        target_file = None
        for object_summary in bucket.objects.filter(Prefix=folder_path):
            if object_summary.key.endswith(".jsonl.out"):
                target_file = object_summary.key
                break


        if target_file:
            output_df = pd.read_json(f"s3://{bucket_name}/"+target_file, lines=True)
            rows = list()
            for _, row in output_df.iterrows():
                entry = dict()
                entry[TEXT] = row[MODEL_OUTPUT][GENERATION].strip()
                entry[RECORDID] = row[RECORDID]
                rows.append(entry)

            outputs = pd.DataFrame(rows)
            inputs = pd.DataFrame(data[INPUT_FILE])
            outputs[RECORDID] = outputs[RECORDID].astype(str)
            outputs = outputs.merge(inputs, how='left', on=RECORDID)
            #outputs = outputs.drop(columns=RECORDID)
            outputs[TIMESTAMP] = data[TIMESTAMP]
            return outputs
        else:
            raise Exception(f"{bucket_name} does not contain .jsonl.out file! Please check if job has completed.")

__init__(role_arn)

Constructor for BedrockClient.

Parameters:

Name Type Description Default
role_arn str

ARN of role to use.

required
Source code in dactyl_generation/bedrock_generation.py
20
21
22
23
24
25
26
27
28
def __init__(self, role_arn: str):
    """
    Constructor for BedrockClient.

    Args:
        role_arn: ARN of role to use.
    """
    super().__init__()
    self.role_arn = role_arn

create_batch_job(prompts_df, s3_input_path, s3_output_path, model, job_name)

Creates batch job for Bedrock Llama models.

Parameters:

Name Type Description Default
prompts_df DataFrame

Dataframe of OpenAI-style prompts.

required
s3_input_path str

Input data path.

required
s3_output_path str

Output data path.

required
model str

Bedrock model ID.

required
job_name str

Name of job

required

Returns:

Name Type Description
jobArn dict

dictionary containing single string

Source code in dactyl_generation/bedrock_generation.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def create_batch_job(self, prompts_df: pd.DataFrame, s3_input_path: str, s3_output_path: str, model: str,  job_name: str) -> dict:
    """
    Creates batch job for Bedrock Llama models.

    Args:
        prompts_df: Dataframe of OpenAI-style prompts.
        s3_input_path: Input data path.
        s3_output_path: Output data path.
        model: Bedrock model ID.
        job_name: Name of job

    Returns:
        jobArn: dictionary containing single string
    """
    inputted_frame = BedrockClient.create_jsonl_input_for_llama(prompts_df, s3_input_path)
    bedrock = boto3.client(service_name="bedrock",region_name="us-east-1")
    input_data_config = (
        {
            S3_INPUT_DATA_CONFIG: {
                S3URI: s3_input_path
            }
        }
    )
    output_data_config = (
        {
            S3_OUTPUT_DATA_CONFIG:{
                S3URI: s3_output_path
            }
        }
    )

    response = bedrock.create_model_invocation_job(
        roleArn=self.role_arn,
        modelId=model,
        jobName=job_name,
        inputDataConfig=input_data_config,
        outputDataConfig=output_data_config
    )
    inputted_frame[MODEL] = model
    return {
        JOB_ARN: response.get(JOB_ARN),
        S3_OUTPUT_DATA_CONFIG: s3_output_path,
        API_CALL: BEDROCK,
        JOB_NAME: job_name,
        INPUT_FILE: json.loads(inputted_frame.to_json(orient='records')),
        TIMESTAMP: str(datetime.now(timezone.utc))

    }

create_jsonl_input_for_llama(prompts_df, s3_path) staticmethod

Creates a JSONL file to upload to S3.

Parameters:

Name Type Description Default
prompts_df DataFrame

prompt dataframe containing OpenAI style messages

required
s3_path str

Path to S3 bucket to save file

required

Returns:

Type Description
DataFrame

None

Source code in dactyl_generation/bedrock_generation.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@staticmethod
def create_jsonl_input_for_llama(prompts_df: pd.DataFrame, s3_path: str) -> pd.DataFrame:
    """
    Creates a JSONL file to upload to S3.

    Args:
        prompts_df: prompt dataframe containing OpenAI style messages
        s3_path: Path to S3 bucket to save file

    Returns:
        None
    """
    original_prompts = prompts_df[PROMPT].to_list()
    prompts_df_copy = pd.DataFrame(prompts_df)
    prompts_df_copy[PROMPT] = prompts_df_copy[PROMPT].apply(lambda messages: BedrockClient.format_llama_prompt(messages))
    messages = prompts_df_copy.drop(columns=[RECORDID]).to_dict(orient="records")

    rows = list()
    for i in range(len(messages)):
        rows.append({
            RECORDID: prompts_df_copy[RECORDID].values[i],
            MODELINPUT:messages[i]
        }
        )
    input_frame = pd.DataFrame(rows)
    input_frame.to_json(s3_path, orient="records",index=False, lines=True)
    prompts_df_ret = pd.DataFrame(prompts_df)
    prompts_df_ret[RECORDID] = input_frame[RECORDID].to_list()
    prompts_df_ret[PROMPT] = original_prompts
    return prompts_df_ret

format_llama_prompt(messages) staticmethod

Formats OpenAI style message to Llama 3.2 style.

Parameters:

Name Type Description Default
messages List[dict]

list of dictionaries containing OpenAI style messages

required

Returns:

Name Type Description
llama_prompt str

formatted llama prompt

Source code in dactyl_generation/bedrock_generation.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@staticmethod
def format_llama_prompt(messages: List[dict]) -> str:
    """
    Formats OpenAI style message to Llama 3.2 style.

    Args:
        messages: list of dictionaries containing OpenAI style messages

    Returns:
        llama_prompt: formatted llama prompt
    """
    formatted_prompt = "<|begin_of_text|>"
    for message in messages:
        role =  message[ROLE]
        formatted_prompt += LLAMA_START_HEADER + role + LLAMA_END_HEADER + message[CONTENT] + "<|eot_id|>"
    formatted_prompt += f"{LLAMA_START_HEADER}assistant{LLAMA_END_HEADER}"
    return formatted_prompt

get_batch_job_output(file_path)

Fetches batch job results given JSON file.

Parameters:

Name Type Description Default
file_path str

JSON file containing jobArn.

required

Returns:

Name Type Description
output_df DataFrame

Dataframe containing generations.

Source code in dactyl_generation/bedrock_generation.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def get_batch_job_output(self, file_path: str) -> pd.DataFrame:
    """
    Fetches batch job results given JSON file.

    Args:
        file_path: JSON file containing jobArn.

    Returns:
        output_df: Dataframe containing generations.
    """
    with open(file_path, 'r') as file:
        data = json.load(file)
    job_arn = data[JOB_ARN].split("/")[-1]
    s3_client = boto3.resource('s3')
    # ignore s3://
    bucket_name = data[S3_OUTPUT_DATA_CONFIG].split("/")[2]
    bucket = s3_client.Bucket(bucket_name)
    folder_path = "/".join(data[S3_OUTPUT_DATA_CONFIG].split("/")[3:]) + job_arn + "/"
    target_file = None
    for object_summary in bucket.objects.filter(Prefix=folder_path):
        if object_summary.key.endswith(".jsonl.out"):
            target_file = object_summary.key
            break


    if target_file:
        output_df = pd.read_json(f"s3://{bucket_name}/"+target_file, lines=True)
        rows = list()
        for _, row in output_df.iterrows():
            entry = dict()
            entry[TEXT] = row[MODEL_OUTPUT][GENERATION].strip()
            entry[RECORDID] = row[RECORDID]
            rows.append(entry)

        outputs = pd.DataFrame(rows)
        inputs = pd.DataFrame(data[INPUT_FILE])
        outputs[RECORDID] = outputs[RECORDID].astype(str)
        outputs = outputs.merge(inputs, how='left', on=RECORDID)
        #outputs = outputs.drop(columns=RECORDID)
        outputs[TIMESTAMP] = data[TIMESTAMP]
        return outputs
    else:
        raise Exception(f"{bucket_name} does not contain .jsonl.out file! Please check if job has completed.")

prompt(messages, model, temperature, top_p, max_completion_tokens=512) staticmethod

Prompt AWS Bedrock model with few shot learning examples.

Parameters:

Name Type Description Default
messages List[dict]

List of OpenAI messages

required
model str

name of model

required
temperature float

temperature parameter

required
top_p float

top p parameter

required
max_completion_tokens int

maximum number of tokens for completion

512

Returns:

Name Type Description
response_content str

string containing message content

Source code in dactyl_generation/bedrock_generation.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@staticmethod
def prompt(messages:List[dict],  model: str, temperature: float, top_p: float, max_completion_tokens: int =512) -> str:
    """
    Prompt AWS Bedrock model with few shot learning examples.

    Args:
        messages: List of OpenAI messages
        model: name of model
        temperature: temperature parameter
        top_p: top p parameter
        max_completion_tokens: maximum number of tokens for completion

    Returns:
        response_content: string containing message content
    """

    response = completion(model, messages, temperature=temperature, top_p=top_p,max_completion_tokens=max_completion_tokens)
    return response.choices[0].message.content