Skip to content

Commit

Permalink
feat(server): remove housing events related to the removed housing ow…
Browse files Browse the repository at this point in the history
…ners
  • Loading branch information
Falinor committed Jan 6, 2025
1 parent 18bc08e commit 2cff2c6
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 34 deletions.
1 change: 1 addition & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ allowed_patterns:
- \[key\]
- key=
- 'key:'
- .keys()
version: "1.0"
102 changes: 102 additions & 0 deletions server/src/scripts/import-unified-owners/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Import unified owners

## Usage

### Migrating

```shell
DATABASE_URL=... yarn workspace @zerologementvacant/server migrate
```

### Importing to postgres

```sql
INSTALL httpfs;
LOAD httpfs;

CREATE OR REPLACE SECRET (
TYPE S3,
KEY_ID '...',
SECRET '...',
ENDPOINT 'cellar-c2.services.clever-cloud.com',
REGION 'us-east-1'
);

CREATE TABLE IF NOT EXISTS "dev"."main"."unified_owners" AS
SELECT
id,
ff_owner_idpersonne AS idpersonne
FROM
read_csv(
's3://zlv-production/production/dump_20241218/unified_owners.csv',
auto_detect = TRUE,
ignore_errors = TRUE
)
ORDER BY id;

WITH duplicates AS (
SELECT COUNT(*), unified_owners.idpersonne FROM unified_owners
GROUP BY unified_owners.idpersonne
HAVING COUNT(*) > 1
)
SELECT * FROM duplicates
LIMIT 10;

COPY (
SELECT DISTINCT ON (idpersonne) idpersonne, id FROM "dev"."main"."unified_owners"
) TO 'unified-owners.csv' (HEADER, DELIMITER ',');

-- Import to postgres
INSTALL postgres;
LOAD postgres;

CREATE OR REPLACE SECRET (
TYPE POSTGRES,
HOST 'localhost',
PORT 5432,
DATABASE 'dev',
USER 'postgres',
PASSWORD 'postgres'
);

ATTACH IF NOT EXISTS '' AS postgres (TYPE POSTGRES);

TRUNCATE TABLE "postgres"."public"."owners_dept";

INSERT INTO "postgres"."public"."owners_dept"
SELECT id AS owner_id, idpersonne AS owner_idpersonne FROM read_csv(
'unified-owners.csv',
auto_detect = TRUE,
header = TRUE,
ignore_errors = TRUE
);


-- Show some metrics
SELECT COUNT(*) FROM "dev"."main"."unified_owners";
SELECT COUNT(*) FROM "postgres"."public"."owners_dept";

-- Are there housings that have several awaiting owners?
SELECT housing_id, COUNT(*) AS count FROM "postgres"."public"."owners_housing"
WHERE rank = -2
GROUP BY housing_id
HAVING COUNT(*) >= 2;

-- Awaiting national housing owners
SELECT COUNT(*) FROM "postgres"."public"."owners_housing"
JOIN "postgres"."public"."owners" ON owners.id = owners_housing.owner_id
WHERE rank = -2 AND idpersonne IS NULL;

-- Departmental housing owners
SELECT COUNT(*) FROM "postgres"."public"."owners_housing"
JOIN "postgres"."public"."owners" ON owners.id = owners_housing.owner_id
WHERE rank >= 1 AND idpersonne IS NOT NULL;
```

### Running the script

This script will be processing the actual housing owners, in production.
```shell
cd server
DATABASE_URL=... yarn ts-node src/scripts/import-unified-owners/index.ts
```
62 changes: 43 additions & 19 deletions server/src/scripts/import-unified-owners/command.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
import {
createProcessor,
FindHousingOwnersOptions
FindHousingOwnersOptions,
RemoveEventsOptions
} from '~/scripts/import-unified-owners/processor';
import { AWAITING_RANK, HousingOwnerApi } from '~/models/HousingOwnerApi';
import {
HousingOwners,
housingOwnersTable
} from '~/repositories/housingOwnerRepository';
import {
Owners,
ownerTable,
parseHousingOwnerApi
} from '~/repositories/ownerRepository';
import departmentalOwnersRepository from '~/repositories/departmentalOwnersRepository';
import { createLogger } from '~/infra/logger';
import {
eventsTable,
HousingEvents,
housingEventsTable
} from '~/repositories/eventRepository';
import { usersTable } from '~/repositories/userRepository';

const logger = createLogger('command');

export default function createImportUnifiedOwnersCommand() {
return async (): Promise<void> => {
const processor = createProcessor({
findHousingOwners,
updateHousingOwner,
removeHousingOwner
removeHousingOwner,
removeEvents
});

await departmentalOwnersRepository.stream().pipeTo(processor);
Expand All @@ -29,31 +39,30 @@ export default function createImportUnifiedOwnersCommand() {
export async function findHousingOwners(
options: FindHousingOwnersOptions
): Promise<ReadonlyArray<HousingOwnerApi>> {
const housingOwners = await HousingOwners()
const query = HousingOwners()
.select(`${housingOwnersTable}.*`)
.join(ownerTable, `${ownerTable}.id`, `${housingOwnersTable}.owner_id`)
.select(`${ownerTable}.*`)
.where((where) => {
where.where({
owner_id: options.nationalOwner,
rank: AWAITING_RANK
});
})
.orWhere((where) => {
where
.whereIn(
'owner_id',
Owners().select('id').where('idpersonne', options.departmentalOwner)
)
.where('rank', '>=', 1);
});
.select(`${ownerTable}.*`);

// Split the request to allow Postgres to use the indexes
const [nationalOwners, departmentalOwners] = await Promise.all([
query.clone().where({
owner_id: options.nationalOwner,
rank: AWAITING_RANK
}),
query
.clone()
.where('idpersonne', options.departmentalOwner)
.where('rank', '>=', 1)
]);
const housingOwners = nationalOwners.concat(departmentalOwners);
return housingOwners.map(parseHousingOwnerApi);
}

export async function updateHousingOwner(
housingOwner: HousingOwnerApi
): Promise<void> {
logger.debug('Updating housing owner...', housingOwner);
await HousingOwners().update({ rank: housingOwner.rank }).where({
owner_id: housingOwner.ownerId,
housing_id: housingOwner.housingId,
Expand All @@ -64,9 +73,24 @@ export async function updateHousingOwner(
export async function removeHousingOwner(
housingOwner: HousingOwnerApi
): Promise<void> {
logger.debug('Removing housing owner...', housingOwner);
await HousingOwners().delete().where({
owner_id: housingOwner.ownerId,
housing_id: housingOwner.housingId,
housing_geo_code: housingOwner.housingGeoCode
});
}

export async function removeEvents(
options: RemoveEventsOptions
): Promise<void> {
logger.debug('Removing events...', options);
await HousingEvents()
.where({ housing_id: options.housingId })
.join(eventsTable, `${eventsTable}.id`, `${housingEventsTable}.event_id`)
.where({ name: 'Changement de propriétaires' })
.whereRaw(`${eventsTable}.created_at::date = '2024-09-08'`)
.join(usersTable, `${usersTable}.id`, `${eventsTable}.created_by`)
.where(`${usersTable}.email`, '=', '[email protected]')
.delete();
}
44 changes: 31 additions & 13 deletions server/src/scripts/import-unified-owners/processor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import async from 'async';
import { List, Map } from 'immutable';
import { WritableStream } from 'node:stream/web';

import { DepartmentalOwnerDBO } from '~/repositories/departmentalOwnersRepository';
import { createLogger } from '~/infra/logger';
import { AWAITING_RANK, HousingOwnerApi } from '~/models/HousingOwnerApi';
import { List } from 'immutable';
import async from 'async';

const logger = createLogger('processor');

Expand All @@ -18,16 +19,26 @@ export interface FindHousingOwnersOptions {
departmentalOwner: string;
}

export interface RemoveEventsOptions {
housingId: string;
}

export interface ProcessorOptions {
findHousingOwners(
options: FindHousingOwnersOptions
): Promise<ReadonlyArray<HousingOwnerApi>>;
updateHousingOwner(housingOwner: HousingOwnerApi): Promise<void>;
removeHousingOwner(housingOwner: HousingOwnerApi): Promise<void>;
removeEvents(options: RemoveEventsOptions): Promise<void>;
}

export function createProcessor(options: ProcessorOptions) {
const { findHousingOwners, updateHousingOwner, removeHousingOwner } = options;
const {
findHousingOwners,
updateHousingOwner,
removeHousingOwner,
removeEvents
} = options;

return new WritableStream<DepartmentalOwnerDBO>({
async write(chunk): Promise<void> {
Expand All @@ -39,8 +50,16 @@ export function createProcessor(options: ProcessorOptions) {
departmentalOwner: chunk.owner_idpersonne
});
const pairs = toPairs(housingOwners);
// Remove the departmental housing owner
// Update the national housing owner
await async.forEach(
pairs,
pairs
.toIndexedSeq()
.map(
(housingOwners) =>
housingOwners.toArray() as [HousingOwnerApi, HousingOwnerApi]
)
.toArray(),
async ([nationalOwner, departmentalOwner]) => {
await removeHousingOwner(departmentalOwner);
await updateHousingOwner({
Expand All @@ -49,16 +68,21 @@ export function createProcessor(options: ProcessorOptions) {
});
}
);
// Remove the associated events
await async.forEach(pairs.keys(), async (housingId) => {
await removeEvents({ housingId });
});
} catch (error) {
// TODO
logger.error(error);
throw error;
}
}
});
}

export function toPairs(
housingOwners: ReadonlyArray<HousingOwnerApi>
): [HousingOwnerApi, HousingOwnerApi][] {
): Map<HousingOwnerApi['housingId'], List<HousingOwnerApi>> {
return List(housingOwners)
.groupBy((housingOwner) => housingOwner.housingId)
.filter(
Expand All @@ -75,13 +99,7 @@ export function toPairs(
)
.map((housingOwners) =>
housingOwners.sortBy((housingOwner) => housingOwner.rank)
)
.toIndexedSeq()
.map(
(housingOwners) =>
housingOwners.toArray() as [HousingOwnerApi, HousingOwnerApi]
)
.toArray();
);
}

export function isNationalOwner(housingOwner: HousingOwnerApi): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ describe('Processor', () => {
]);
const updateHousingOwner = jest.fn(async () => {});
const removeHousingOwner = jest.fn(async () => {});
const removeEvents = jest.fn(async () => {});

beforeAll(async () => {
const stream = new ReadableStream<DepartmentalOwnerDBO>({
Expand All @@ -63,7 +64,8 @@ describe('Processor', () => {
const processor = createProcessor({
findHousingOwners,
updateHousingOwner,
removeHousingOwner
removeHousingOwner,
removeEvents
});

await stream.pipeTo(processor);
Expand All @@ -82,7 +84,10 @@ describe('Processor', () => {
expect(removeHousingOwner).toHaveBeenCalledWith(departmentalOwner);
});

it.todo('should remove the events concerning a change of ownership');
it('should remove the events concerning a change of ownership', () => {
expect(removeEvents).toHaveBeenCalledOnce();
expect(removeEvents).toHaveBeenCalledWith({ housingId: housing.id });
});
});

describe('toPairs', () => {
Expand Down

0 comments on commit 2cff2c6

Please sign in to comment.