Events storage procedure

Added the necessary database code to convert the contents of event
queues into actual event records. The changes include:
 * a new table, events.pending_events, which is automatically filled by
a trigger when events are inserted into queue tables,
 * the game.events.batchSize constant which defines the maximal amount
of events to process in a single transaction,
 * the events.eq_process() stored procedure, which processes the events.
In addition, the "hstore" extension was added as it is the easiest way
to convert events from the queues' table model to the store's
meta-model.
This commit is contained in:
Emmanuel BENOîT 2012-07-01 14:12:22 +02:00
parent dc9ef2292d
commit 3a0f5bbb78
5 changed files with 248 additions and 4 deletions

View file

@ -32,6 +32,8 @@ public class ConstantsRegistrarBean
// Misc. game-related values // Misc. game-related values
cDesc = "Game updates - batch size."; cDesc = "Game updates - batch size.";
defs.add( new ConstantDefinition( "game.batchSize" , "Game (misc)" , cDesc , 20.0 , 1.0 , true ) ); defs.add( new ConstantDefinition( "game.batchSize" , "Game (misc)" , cDesc , 20.0 , 1.0 , true ) );
cDesc = "Event processing - batch size.";
defs.add( new ConstantDefinition( "game.events.batchSize" , "Game (misc)" , cDesc , 100.0 , 1.0 , true ) );
cDesc = "Population growth factor."; cDesc = "Population growth factor.";
defs.add( new ConstantDefinition( "game.growthFactor" , "Game (misc)" , cDesc , 50.0 , 1.0 , true ) ); defs.add( new ConstantDefinition( "game.growthFactor" , "Game (misc)" , cDesc , 50.0 , 1.0 , true ) );
cDesc = "Increase to the population growth factor caused by reanimation centres."; cDesc = "Increase to the population growth factor caused by reanimation centres.";

View file

@ -28,3 +28,8 @@ CREATE USER MAPPING FOR :dbuser
OPTIONS ( user :dbuser_string , password :dbupass ); OPTIONS ( user :dbuser_string , password :dbupass );
GRANT USAGE ON FOREIGN SERVER srv_logging TO :dbuser; GRANT USAGE ON FOREIGN SERVER srv_logging TO :dbuser;
/* The events sytem uses the hash store extension to convert events from the
* queues to main storage.
*/
CREATE EXTENSION hstore;

View file

@ -239,6 +239,28 @@ CREATE SEQUENCE events.event_id_sequence;
GRANT SELECT,UPDATE ON events.event_id_sequence TO :dbuser; GRANT SELECT,UPDATE ON events.event_id_sequence TO :dbuser;
/*
* Pending events
* --------------
*
* This table is updated when events are inserted into queues and when they
* are converted. It tracks events that need to be converted.
*
* Note: the table does not have a primary key or foreign keys; it is simply
* a cache which must not get in the way of inserts into queues.
*
* Warning: a record in this table may be orphaned, i.e. there could be no
* corresponding entry in the queue table.
*/
DROP TABLE IF EXISTS events.pending_events CASCADE;
CREATE TABLE events.pending_events(
/* Event identifier */
event_id BIGINT NOT NULL ,
/* Event type */
evdef_id VARCHAR(48) NOT NULL
);
/* /*
* Event storage table * Event storage table
* ------------------- * -------------------

View file

@ -707,6 +707,33 @@ REVOKE EXECUTE
FROM PUBLIC; FROM PUBLIC;
/*
* Trigger used to insert entries into pending_events
* --------------------------------------------------
*
* This function is used as a trigger to insert queued events into the
* events.pending_events table.
*
* Parameters:
* TG_ARGV[0] Event type identifier
*/
DROP FUNCTION IF EXISTS events.tgf_eq_insert( );
CREATE FUNCTION events.tgf_eq_insert( )
RETURNS TRIGGER
LANGUAGE PLPGSQL
STRICT VOLATILE SECURITY DEFINER
AS $tgf_eq_insert$
BEGIN
INSERT INTO events.pending_events( event_id , evdef_id )
VALUES ( NEW.event_id , TG_ARGV[0] );
RETURN NEW;
END;
$tgf_eq_insert$;
REVOKE EXECUTE
ON FUNCTION events.tgf_eq_insert( )
FROM PUBLIC;
/* /*
* Create the event queuing table for a new definition * Create the event queuing table for a new definition
@ -734,14 +761,14 @@ CREATE FUNCTION events.evdef_create_queue_table( _evdef_id TEXT )
AS $evdef_create_queue_table$ AS $evdef_create_queue_table$
DECLARE DECLARE
_tsfx TEXT;
_rec RECORD; _rec RECORD;
_qstr TEXT; _qstr TEXT;
_fname TEXT; _fname TEXT;
BEGIN BEGIN
_tsfx := replace( _evdef_id , '-' , '_' );
_qstr := 'CREATE TABLE events.eq_' || replace( _evdef_id , '-' , '_' ) _qstr := 'CREATE TABLE events.eq_' || _tsfx || $tbl_def$ (
|| $tbl_def$ (
event_id BIGINT NOT NULL PRIMARY KEY event_id BIGINT NOT NULL PRIMARY KEY
DEFAULT nextval('events.event_id_sequence'::regclass) , DEFAULT nextval('events.event_id_sequence'::regclass) ,
event_rtime TIMESTAMP WITHOUT TIME ZONE NOT NULL event_rtime TIMESTAMP WITHOUT TIME ZONE NOT NULL
@ -782,9 +809,14 @@ BEGIN
_qstr := _qstr || ');'; _qstr := _qstr || ');';
EXECUTE _qstr; EXECUTE _qstr;
_qstr := 'GRANT INSERT ON TABLE events.eq_' || replace( _evdef_id , '-' , '_' ) _qstr := 'GRANT INSERT ON TABLE events.eq_' || _tsfx
|| ' TO $init_code$ || $1 || $init_code$;'; || ' TO $init_code$ || $1 || $init_code$;';
EXECUTE _qstr; EXECUTE _qstr;
_qstr := 'CREATE TRIGGER tg_eqi_' || _tsfx || ' AFTER INSERT ON events.eq_'
|| _tsfx || ' FOR EACH ROW EXECUTE PROCEDURE events.tgf_eq_insert('''
|| _evdef_id || ''');';
EXECUTE _qstr;
END; END;
$evdef_create_queue_table$; $init_code$ USING $1; END; $init_func$; $evdef_create_queue_table$; $init_code$ USING $1; END; $init_func$;
SELECT _temp_init_func(:dbuser_string); SELECT _temp_init_func(:dbuser_string);

View file

@ -0,0 +1,183 @@
-- LegacyWorlds Beta 6
-- PostgreSQL database scripts
--
-- Functions that extract event data from the queue tables
-- and store them in the long term storage tables.
--
-- Copyright(C) 2004-2012, DeepClone Development
-- --------------------------------------------------------
/*
* Entity fields table
* -------------------
*
* This table, which is filled when the database is created, associates entity
* types with fields in the events.field_values table.
*/
DROP TABLE IF EXISTS events._entity_fields;
CREATE TABLE events._entity_fields(
efdef_entity events.entity_field_type NOT NULL PRIMARY KEY ,
field_name NAME NOT NULL ,
field_type NAME NOT NULL
);
INSERT INTO events._entity_fields VALUES
( 'EMP' , 'empire_id' , 'INT' ) ,
( 'PLN' , 'planet_id' , 'INT' ) ,
( 'FLT' , 'fleet_id' , 'BIGINT' ) ,
( 'ALL' , 'alliance_id' , 'INT' ) ,
( 'BAT' , 'battle_id' , 'BIGINT' ) ,
( 'ADM' , 'admin_id' , 'INT' ) ,
( 'BUG' , 'bug_report_id' , 'BIGINT' );
/*
* Process a pending event
* -----------------------
*
* /!\ INTERNAL FUNCTION /!\
*
* This procedure processes a single row from an event queue.
*
* If the specified event identifier actually exists in the queue, it is
* converted and stored in the main events tables, then the queue entry is
* removed.
*
* In all cases, the entry is deleted from events.pending_events.
*
* Parameters:
* _event_id The event's identifier
* _evdef_id The event's type
*
* Returns:
* ??? TRUE
*/
DROP FUNCTION IF EXISTS events.eq_process_event( BIGINT , TEXT );
CREATE FUNCTION events.eq_process_event(
_event_id BIGINT ,
_evdef_id TEXT )
RETURNS BOOLEAN
LANGUAGE PLPGSQL
STRICT VOLATILE SECURITY INVOKER
AS $eq_process_event$
DECLARE
_tbl TEXT;
_qstr TEXT;
_qentry RECORD;
_nfound INT;
_efdef RECORD;
BEGIN
_tbl := 'events.eq_' || replace( _evdef_id , '-' , '_' );
_qstr := 'SELECT event_rtime , event_gtime , empire_id FROM '
|| _tbl || ' WHERE event_id = $1';
EXECUTE _qstr INTO _qentry USING _event_id;
GET DIAGNOSTICS _nfound = ROW_COUNT;
IF _nfound > 0
THEN
INSERT INTO events.events_v2 (
event_id , evdef_id , event_rtime , event_gtime , empire_id
) VALUES (
_event_id , _evdef_id , _qentry.event_rtime ,
_qentry.event_gtime , _qentry.empire_id
);
_qstr := format( $field_acquisition$
SELECT efdef_id , field_name , field_type ,
( efdef_type = 'I18N') AS i18n , _sq1.value
FROM events.field_definitions
INNER JOIN (
SELECT (each(hstore(_tbl))).*
FROM %s _tbl
WHERE _tbl.event_id = $1
) _sq1 ON _sq1.key = 'ef_' || replace(efdef_id , '-','_')
LEFT OUTER JOIN events._entity_fields
USING ( efdef_entity )
WHERE evdef_id = $2
$field_acquisition$ , _tbl );
FOR _efdef IN EXECUTE _qstr USING _event_id , _evdef_id
LOOP
_qstr := 'INSERT INTO events.field_values(event_id,evdef_id,efdef_id,efval_litteral'
|| ( CASE
WHEN _efdef.field_name IS NOT NULL
THEN ',' || _efdef.field_name
WHEN _efdef.i18n
THEN ',string_id'
ELSE
''
END ) || ') ';
IF _efdef.i18n
THEN
_qstr := _qstr || $i18n_query$
SELECT $1,$2,$3,_str.id::TEXT||' '||_str.name,_str.id
FROM defs.strings _str
WHERE _str.name = $4
$i18n_query$;
ELSIF _efdef.field_name IS NOT NULL
THEN
_qstr := _qstr || format( 'VALUES($1,$2,$3,$4,$4::%s)' , _efdef.field_type );
ELSE
_qstr := _qstr || 'VALUES($1,$2,$3,$4)';
END IF;
EXECUTE _qstr USING _event_id , _evdef_id , _efdef.efdef_id , _efdef.value;
END LOOP;
END IF;
DELETE FROM events.pending_events
WHERE event_id = _event_id;
RETURN TRUE;
END;
$eq_process_event$;
REVOKE EXECUTE
ON FUNCTION events.eq_process_event( BIGINT , TEXT )
FROM PUBLIC;
/*
* Process pending events
* ----------------------
*
* This procedure processes events from the queues. Each time the procedure is
* called, it will process at most "game.events.batchSize" events.
*
* Returns:
* ??? TRUE if events were processed, FALSE if the queues are
* empty
*/
DROP FUNCTION IF EXISTS events.eq_process( );
CREATE FUNCTION events.eq_process( )
RETURNS BOOLEAN
LANGUAGE PLPGSQL
STRICT VOLATILE SECURITY DEFINER
AS $eq_process$
DECLARE
_limit INT;
BEGIN
_limit := sys.get_constant( 'game.events.batchSize' );
PERFORM events.eq_process_event( event_id , evdef_id )
FROM events.pending_events
LIMIT _limit;
RETURN FOUND;
END;
$eq_process$;
REVOKE EXECUTE
ON FUNCTION events.eq_process( )
FROM PUBLIC;
GRANT EXECUTE
ON FUNCTION events.eq_process( )
TO :dbuser;