from onegov.election_day import _
from onegov.election_day.formats.imports.common import EXPATS
from onegov.election_day.formats.imports.common import FileImportError
from onegov.election_day.formats.imports.common import get_entity_and_district
from onegov.election_day.formats.imports.common import load_csv
from onegov.election_day.formats.imports.common import validate_integer
from onegov.election_day.models import BallotResult
from sqlalchemy.orm import object_session
from typing import Any
from typing import IO
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from onegov.core.csv import DefaultRow
from onegov.election_day.models import Canton
from onegov.election_day.models import Municipality
from onegov.election_day.models import Vote
from onegov.election_day.types import BallotType
# TODO: TypedDict for BallotResult
[docs]
def parse_domain(domain: str) -> str | None:
if domain in ('Eidg', 'CH'):
return 'federation'
if domain in ('Kant', 'CT'):
return 'canton'
if domain in ('Gde', 'MU'):
return 'municipality'
return None
[docs]
def line_is_relevant(
line: 'DefaultRow',
domain: str,
district: str,
number: str
) -> bool:
return (
parse_domain(line.art) == domain
and line.sortwahlkreis == district
and line.sortgeschaeft == number
)
[docs]
def import_vote_wabstic(
vote: 'Vote',
principal: 'Canton | Municipality',
number: str,
district: str,
file_sg_geschaefte: IO[bytes],
mimetype_sg_geschaefte: str,
file_sg_gemeinden: IO[bytes],
mimetype_sg_gemeinden: str
) -> list[FileImportError]:
""" Tries to import the given CSV files from a WabstiCExport.
This function is typically called automatically every few minutes during
an election day - we use bulk inserts to speed up the import.
:return:
A list containing errors.
"""
errors = []
entities = principal.entities[vote.date.year]
# Read the files
sg_geschaefte, error = load_csv(
file_sg_geschaefte, mimetype_sg_geschaefte,
expected_headers=WABSTIC_VOTE_HEADERS_SG_GESCHAEFTE,
filename='sg_geschaefte'
)
if error:
errors.append(error)
sg_gemeinden, error = load_csv(
file_sg_gemeinden, mimetype_sg_gemeinden,
expected_headers=WABSTIC_VOTE_HEADERS_SG_GEMEINDEN,
filename='sg_gemeinden'
)
if error:
errors.append(error)
if errors:
return errors
# Get the vote type
used_ballot_types: list[BallotType] = ['proposal']
if vote.type == 'complex':
used_ballot_types.extend(['counter-proposal', 'tie-breaker'])
# Parse the vote
remaining_entities = None
assert sg_geschaefte is not None
for line in sg_geschaefte.lines:
line_errors: list[str] = []
if not line_is_relevant(line, vote.domain, district, number):
continue
remaining_entities = None
try:
if hasattr(line, 'anzpendentgde'):
remaining_entities = validate_integer(
line, 'anzpendentgde', default=None
)
else:
remaining_entities = validate_integer(
line, 'anzgdependent', default=None
)
except Exception as e:
line_errors.append(
_('Error in anzpendentgde/anzgdependent: ${msg}',
mapping={'msg': e.args[0]}))
if line_errors:
errors.extend(
FileImportError(
error=err, line=line.rownumber, filename='sg_geschaefte'
)
for err in line_errors
)
continue
# Parse the results
ballot_results: dict[BallotType, list[dict[str, Any]]]
ballot_results = {key: [] for key in used_ballot_types}
added_entities = []
assert sg_gemeinden is not None
for line in sg_gemeinden.lines:
line_errors = []
if not line_is_relevant(line, vote.domain, district, number):
continue
# Parse the id of the entity
entity_id = None
entity_name = ''
entity_district = None
try:
entity_id = validate_integer(line, 'bfsnrgemeinde')
except ValueError as e:
line_errors.append(e.args[0])
else:
entity_id = 0 if entity_id in EXPATS else entity_id
if entity_id in added_entities:
line_errors.append(
_('${name} was found twice', mapping={'name': entity_id}))
else:
added_entities.append(entity_id)
if entity_id and entity_id not in entities:
line_errors.append(
_('${name} is unknown', mapping={'name': entity_id}))
else:
entity_name, entity_district, _superregion = (
get_entity_and_district(
entity_id, entities, vote, principal, line_errors
)
)
# Skip expats if not enabled
if entity_id == 0 and not vote.has_expats:
continue
# Check if the entity is counted
try:
counted_num = validate_integer(line, 'sperrung')
counted = counted_num != 0
except ValueError:
line_errors.append(_('Invalid values'))
else:
if not counted:
continue
# Parse the eligible voters
try:
eligible_voters = validate_integer(line, 'stimmberechtigte')
except ValueError as e:
line_errors.append(e.args[0])
# Parse the invalid votes
try:
invalid = validate_integer(line, 'stmungueltig')
except ValueError as e:
line_errors.append(e.args[0])
# Parse the yeas
yeas = {}
try:
yeas['proposal'] = validate_integer(line, 'stmhgja')
yeas['counter-proposal'] = validate_integer(line, 'stmn1ja')
yeas['tie-breaker'] = validate_integer(line, 'stmn2ja')
except ValueError as e:
line_errors.append(e.args[0])
# Parse the nays
nays = {}
try:
nays['proposal'] = validate_integer(line, 'stmhgnein')
nays['counter-proposal'] = validate_integer(line, 'stmn1nein')
nays['tie-breaker'] = validate_integer(line, 'stmn2nein')
except ValueError as e:
line_errors.append(e.args[0])
# Parse the empty votes
empty = {}
try:
empty['proposal'] = (
validate_integer(line, 'stmleer')
or validate_integer(line, 'stmhgohneaw')
)
empty['counter-proposal'] = validate_integer(line, 'stmn1ohneaw')
empty['tie-breaker'] = validate_integer(line, 'stmn2ohneaw')
except ValueError:
line_errors.append(_('Could not read the empty votes'))
# Pass the line errors
if line_errors:
errors.extend(
FileImportError(
error=err, line=line.rownumber, filename='sg_gemeinden'
)
for err in line_errors
)
continue
# all went well (only keep doing this as long as there are no errors)
if not errors:
for ballot_type in used_ballot_types:
ballot_results[ballot_type].append(
{
'entity_id': entity_id,
'name': entity_name,
'district': entity_district,
'counted': counted,
'eligible_voters': eligible_voters,
'invalid': invalid,
'yeas': yeas[ballot_type],
'nays': nays[ballot_type],
'empty': empty[ballot_type]
}
)
if errors:
return errors
# Add the missing entities
for ballot_type in used_ballot_types:
remaining = set(entities.keys())
if vote.has_expats:
remaining.add(0)
remaining -= {r['entity_id'] for r in ballot_results[ballot_type]}
for entity_id in remaining:
entity_name, entity_district, _superregion = (
get_entity_and_district(
entity_id, entities, vote, principal
)
)
if vote.domain == 'municipality':
if principal.domain != 'municipality':
if entity_name != vote.domain_segment:
continue
ballot_results[ballot_type].append(
{
'entity_id': entity_id,
'name': entity_name,
'district': entity_district,
'counted': False,
}
)
# Add the results to the DB
vote.clear_results()
vote.last_result_change = vote.timestamp()
vote.status = 'unknown'
if remaining_entities == 0:
vote.status = 'final'
ballot_ids = {b: vote.ballot(b).id for b in used_ballot_types}
session = object_session(vote)
session.flush()
session.bulk_insert_mappings(
BallotResult,
(
dict(**result, ballot_id=ballot_ids[ballot_type])
for ballot_type in used_ballot_types
for result in ballot_results[ballot_type]
)
)
session.expire(vote)
return []