Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose bundle retry settings #268

Merged
merged 5 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ services:
- TRUSTED_NODE_URL=${TRUSTED_NODE_URL:-}
- TRUSTED_GATEWAY_URL=https://${TRUSTED_GATEWAY_HOST:-arweave.net}
- TRUSTED_GATEWAYS_URLS=${TRUSTED_GATEWAYS_URLS:-}
- TRUSTED_GATEWAYS_REQUEST_TIMEOUT_MS=${TRUSTED_GATEWAYS_REQUEST_TIMEOUT_MS:-}
- START_HEIGHT=${START_HEIGHT:-}
- STOP_HEIGHT=${STOP_HEIGHT:-}
- SKIP_CACHE=${SKIP_CACHE:-}
Expand Down Expand Up @@ -117,6 +118,8 @@ services:
- FS_CLEANUP_WORKER_BATCH_SIZE=${FS_CLEANUP_WORKER_BATCH_SIZE:-}
- FS_CLEANUP_WORKER_BATCH_PAUSE_DURATION=${FS_CLEANUP_WORKER_BATCH_PAUSE_DURATION:-}
- FS_CLEANUP_WORKER_RESTART_PAUSE_DURATION=${FS_CLEANUP_WORKER_RESTART_PAUSE_DURATION:-}
- BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS=${BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS:-}
- BUNDLE_REPAIR_RETRY_BATCH_SIZE=${BUNDLE_REPAIR_RETRY_BATCH_SIZE:-}
networks:
- ar-io-network
depends_on:
Expand Down
5 changes: 4 additions & 1 deletion docs/envs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This document describes the environment variables that can be used to configure
| TRUSTED_NODE_URL | String | "https://arweave.net" | Arweave node to use for fetching data |
| TRUSTED_GATEWAY_URL | String | "https://arweave.net" | Arweave node to use for proxying requests |
| TRUSTED_GATEWAYS_URLS | String | TRUSTED_GATEWAY_URL | A JSON map of gateways and priority |
| TRUSTED_GATEWAYS_REQUEST_TIMEOUT_MS | String | "10000" | Request timeout in milliseconds for trusted gateways |
| TRUSTED_ARNS_GATEWAY_URL | String | "https://__NAME__.arweave.dev" | ArNS gateway |
| INSTANCE_ID | String | "" | Adds an "INSTANCE_ID" field to output logs |
| LOG_FORMAT | String | "simple" | Sets the format of output logs, accepts "simple" and "json" |
Expand Down Expand Up @@ -74,4 +75,6 @@ This document describes the environment variables that can be used to configure
| AWS_ENDPOINT | String | undefined | Custom endpoint for AWS services |
| AWS_S3_CONTIGUOUS_DATA_BUCKET | String | undefined | AWS S3 bucket name used for storing data |
| AWS_S3_CONTIGUOUS_DATA_PREFIX | String | undefined | Prefix for the S3 bucket to organize data |
| CHUNK_POST_MIN_SUCCESS_COUNT | String | "3" | minimum count of 200 responses for of a given chunk to be considered properly seeded |
| CHUNK_POST_MIN_SUCCESS_COUNT | String | "3" | Minimum count of 200 responses for of a given chunk to be considered properly seeded |
| BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS | String | "300" | Interval in seconds for retrying bundles |
| BUNDLE_REPAIR_RETRY_BATCH_SIZE | String | "1000" | Batch size for retrying bundles |
15 changes: 15 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ Object.entries(TRUSTED_GATEWAYS_URLS).forEach(([url, weight]) => {
}
});

export const TRUSTED_GATEWAYS_REQUEST_TIMEOUT_MS = +env.varOrDefault(
'TRUSTED_GATEWAYS_REQUEST_TIMEOUT_MS',
'10000',
);

// Trusted chunk POST URLs (for posting chunks received at /chunk)
export const CHUNK_POST_URLS = env
.varOrDefault('CHUNK_POST_URLS', `${TRUSTED_NODE_URL}/chunk`)
Expand Down Expand Up @@ -252,6 +257,16 @@ export const MAX_FLUSH_INTERVAL_SECONDS = +env.varOrDefault(
'600',
);

export const BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS = +env.varOrDefault(
'BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS',
'300', // 5 minutes
);

export const BUNDLE_REPAIR_RETRY_BATCH_SIZE = +env.varOrDefault(
'BUNDLE_REPAIR_RETRY_BATCH_SIZE',
'1000',
);

//
// File system cleanup
//
Expand Down
16 changes: 11 additions & 5 deletions src/data/gateways-data-source.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import * as metrics from '../metrics.js';
import { TestDestroyedReadable, axiosStreamData } from './test-utils.js';
import { Readable } from 'node:stream';

const axiosMockCommonParams = (config: any) => ({
interceptors: {
request: { use: () => {} }, // eslint-disable-line @typescript-eslint/no-empty-function
response: { use: () => {} }, // eslint-disable-line @typescript-eslint/no-empty-function
},
defaults: config,
});

let log: winston.Logger;
let dataSource: GatewaysDataSource;
let mockedAxiosInstance: any;
Expand All @@ -45,9 +53,7 @@ beforeEach(async () => {
'X-AR-IO-Origin': 'node-url',
},
}),
defaults: {
baseURL: 'https://gateway.domain',
},
...axiosMockCommonParams({ baseURL: 'https://gateway.domain' }),
};

mock.method(axios, 'create', () => mockedAxiosInstance);
Expand Down Expand Up @@ -129,7 +135,7 @@ describe('GatewayDataSource', () => {
},
};
},
defaults: config,
...axiosMockCommonParams(config),
}));

await dataSource.getData({ id: 'test-id' });
Expand Down Expand Up @@ -167,7 +173,7 @@ describe('GatewayDataSource', () => {
},
};
},
defaults: config,
...axiosMockCommonParams(config),
}));

await dataSource.getData({ id: 'test-id' });
Expand Down
41 changes: 38 additions & 3 deletions src/data/gateways-data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import {
RequestAttributes,
} from '../types.js';
import * as metrics from '../metrics.js';

const DEFAULT_REQUEST_TIMEOUT_MS = 10000;
import * as config from '../config.js';

export class GatewaysDataSource implements ContiguousDataSource {
private log: winston.Logger;
Expand All @@ -40,7 +39,7 @@ export class GatewaysDataSource implements ContiguousDataSource {
constructor({
log,
trustedGatewaysUrls,
requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS,
requestTimeoutMs = config.TRUSTED_GATEWAYS_REQUEST_TIMEOUT_MS,
}: {
log: winston.Logger;
trustedGatewaysUrls: Record<string, number>;
Expand Down Expand Up @@ -104,6 +103,42 @@ export class GatewaysDataSource implements ContiguousDataSource {
timeout: this.requestTimeoutMs,
});

gatewayAxios.interceptors.request.use((config) => {
this.log.debug('Axios request initiated', {
url: config.url,
method: config.method,
headers: config.headers,
params: config.params,
timeout: config.timeout,
});
return config;
});

gatewayAxios.interceptors.response.use(
(response) => {
this.log.debug('Axios response received', {
url: response.config.url,
status: response.status,
headers: response.headers,
});
return response;
},
(error) => {
if (error.response) {
this.log.error('Axios response error', {
url: error.response.config.url,
status: error.response.status,
headers: error.response.headers,
});
} else {
this.log.error('Axios network error', {
message: error.message,
});
}
return Promise.reject(error);
},
);
Comment on lines +106 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging security and consistency.

The interceptors enhance observability but have some concerns:

  1. Avoid logging sensitive information in headers that might contain auth tokens.
  2. Use consistent log levels - consider using debug for successful responses and warn for errors.

Apply this diff to improve the logging implementation:

 gatewayAxios.interceptors.request.use((config) => {
   this.log.debug('Axios request initiated', {
     url: config.url,
     method: config.method,
-    headers: config.headers,
     params: config.params,
     timeout: config.timeout,
   });
   return config;
 });

 gatewayAxios.interceptors.response.use(
   (response) => {
     this.log.debug('Axios response received', {
       url: response.config.url,
       status: response.status,
-      headers: response.headers,
     });
     return response;
   },
   (error) => {
     if (error.response) {
-      this.log.error('Axios response error', {
+      this.log.warn('Axios response error', {
         url: error.response.config.url,
         status: error.response.status,
-        headers: error.response.headers,
       });
     } else {
-      this.log.error('Axios network error', {
+      this.log.warn('Axios network error', {
         message: error.message,
       });
     }
     return Promise.reject(error);
   },
 );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
gatewayAxios.interceptors.request.use((config) => {
this.log.debug('Axios request initiated', {
url: config.url,
method: config.method,
headers: config.headers,
params: config.params,
timeout: config.timeout,
});
return config;
});
gatewayAxios.interceptors.response.use(
(response) => {
this.log.debug('Axios response received', {
url: response.config.url,
status: response.status,
headers: response.headers,
});
return response;
},
(error) => {
if (error.response) {
this.log.error('Axios response error', {
url: error.response.config.url,
status: error.response.status,
headers: error.response.headers,
});
} else {
this.log.error('Axios network error', {
message: error.message,
});
}
return Promise.reject(error);
},
);
gatewayAxios.interceptors.request.use((config) => {
this.log.debug('Axios request initiated', {
url: config.url,
method: config.method,
params: config.params,
timeout: config.timeout,
});
return config;
});
gatewayAxios.interceptors.response.use(
(response) => {
this.log.debug('Axios response received', {
url: response.config.url,
status: response.status,
});
return response;
},
(error) => {
if (error.response) {
this.log.warn('Axios response error', {
url: error.response.config.url,
status: error.response.status,
});
} else {
this.log.warn('Axios network error', {
message: error.message,
});
}
return Promise.reject(error);
},
);


this.log.debug('Attempting to fetch contiguous data from gateway', {
id,
gatewayUrl,
Expand Down
7 changes: 3 additions & 4 deletions src/workers/bundle-repair-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import * as winston from 'winston';
import * as config from '../config.js';

import { BundleIndex } from '../types.js';
import { TransactionFetcher } from './transaction-fetcher.js';

const DEFAULT_RETRY_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
const DEFAULT_UPDATE_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
const DEFAULT_BUNDLE_BACKFILL_INTERVAL_MS = 15 * 60 * 1000; // 15 minutes
const DEFAULT_FILTER_REPOCESS_INTERVAL_MS = 15 * 60 * 1000; // 15 minutes
const DEFAULT_BUNDLES_TO_RETRY = 1000;

export class BundleRepairWorker {
// Dependencies
Expand Down Expand Up @@ -66,7 +65,7 @@ export class BundleRepairWorker {
async start(): Promise<void> {
const defaultInterval = setInterval(
this.retryBundles.bind(this),
DEFAULT_RETRY_INTERVAL_MS,
config.BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS * 1000,
);
this.intervalIds.push(defaultInterval);
const defaultUpdateInterval = setInterval(
Expand Down Expand Up @@ -104,7 +103,7 @@ export class BundleRepairWorker {
async retryBundles() {
try {
const bundleIds = await this.bundleIndex.getFailedBundleIds(
DEFAULT_BUNDLES_TO_RETRY,
config.BUNDLE_REPAIR_RETRY_BATCH_SIZE,
);
for (const bundleId of bundleIds) {
this.log.info('Retrying failed bundle', { bundleId });
Expand Down
Loading