diff --git a/dependencies/Firebird.Embedded/aliases.conf b/dependencies/Firebird.Embedded/aliases.conf
new file mode 100644
index 0000000..eb8f2e1
--- /dev/null
+++ b/dependencies/Firebird.Embedded/aliases.conf
@@ -0,0 +1,8 @@
+#
+# List of known database aliases
+# ------------------------------
+#
+# Examples:
+#
+# dummy = c:\data\dummy.fdb
+#
diff --git a/dependencies/Firebird.Embedded/fbclient.dll b/dependencies/Firebird.Embedded/fbclient.dll
new file mode 100644
index 0000000..2a5239c
Binary files /dev/null and b/dependencies/Firebird.Embedded/fbclient.dll differ
diff --git a/dependencies/Firebird.Embedded/fbembed.dll b/dependencies/Firebird.Embedded/fbembed.dll
new file mode 100644
index 0000000..2a5239c
Binary files /dev/null and b/dependencies/Firebird.Embedded/fbembed.dll differ
diff --git a/dependencies/Firebird.Embedded/firebird.msg b/dependencies/Firebird.Embedded/firebird.msg
new file mode 100644
index 0000000..8f8bc8e
Binary files /dev/null and b/dependencies/Firebird.Embedded/firebird.msg differ
diff --git a/dependencies/Firebird.Embedded/ib_util.dll b/dependencies/Firebird.Embedded/ib_util.dll
new file mode 100644
index 0000000..9f8cdcb
Binary files /dev/null and b/dependencies/Firebird.Embedded/ib_util.dll differ
diff --git a/dependencies/Firebird.Embedded/icudt30.dll b/dependencies/Firebird.Embedded/icudt30.dll
new file mode 100644
index 0000000..0543d52
Binary files /dev/null and b/dependencies/Firebird.Embedded/icudt30.dll differ
diff --git a/dependencies/Firebird.Embedded/icuin30.dll b/dependencies/Firebird.Embedded/icuin30.dll
new file mode 100644
index 0000000..90929a4
Binary files /dev/null and b/dependencies/Firebird.Embedded/icuin30.dll differ
diff --git a/dependencies/Firebird.Embedded/icuuc30.dll b/dependencies/Firebird.Embedded/icuuc30.dll
new file mode 100644
index 0000000..f63a4d7
Binary files /dev/null and b/dependencies/Firebird.Embedded/icuuc30.dll differ
diff --git a/src/NEventStore.Persistence.FirebirdSql.Tests/NEventStore.Persistence.FirebirdSql.Tests.csproj b/src/NEventStore.Persistence.FirebirdSql.Tests/NEventStore.Persistence.FirebirdSql.Tests.csproj
new file mode 100644
index 0000000..ef7555e
--- /dev/null
+++ b/src/NEventStore.Persistence.FirebirdSql.Tests/NEventStore.Persistence.FirebirdSql.Tests.csproj
@@ -0,0 +1,139 @@
+
+
+
+
+ Debug
+ AnyCPU
+ {0429FD21-A8EF-4C73-967B-579A9C50CE2E}
+ Library
+ Properties
+ NEventStore.Persistence.AcceptanceTests
+ NEventStore.Persistence.FirebirdSql.Tests
+ v4.5
+ 512
+ ..\..\
+ true
+
+
+
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+ false
+
+
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+ false
+
+
+
+ ..\packages\FirebirdSql.Data.FirebirdClient.4.6.2.0\lib\net45\FirebirdSql.Data.FirebirdClient.dll
+ True
+
+
+ False
+ ..\packages\Npgsql.2.0.11\lib\Net40\Mono.Security.dll
+
+
+
+
+
+
+ False
+ ..\packages\xunit.1.9.1\lib\net20\xunit.dll
+
+
+ False
+ ..\packages\xunit.should.1.1\lib\net35\xunit.should.dll
+
+
+
+
+ PersistenceTests.cs
+
+
+ Properties\GlobalAssemblyInfo.cs
+
+
+ Properties\AssemblyInfo.cs
+
+
+
+
+
+
+ {03946843-f343-419c-88ef-3e446d08dfa6}
+ NEventStore
+
+
+ {3fe594fe-16ff-4405-97d5-5a58fb12520b}
+ NEventStore.Persistence.AcceptanceTests
+
+
+ {D210B888-D588-4231-9DDB-CB01F57D35C5}
+ NEventStore.Persistence.Sql.Tests
+
+
+ {F9E7FD69-0818-48CA-9249-5387739E1B6A}
+ NEventStore.Persistence.Sql
+
+
+
+
+ aliases.conf
+ Always
+
+
+ firebird.msg
+ Always
+
+
+ Designer
+
+
+
+
+
+ fbclient.dll
+ Always
+
+
+ fbembed.dll
+ Always
+
+
+ ib_util.dll
+ Always
+
+
+ icudt30.dll
+ Always
+
+
+ icuin30.dll
+ Always
+
+
+ icuuc30.dll
+ Always
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/NEventStore.Persistence.FirebirdSql.Tests/PersistenceEngineFixture.cs b/src/NEventStore.Persistence.FirebirdSql.Tests/PersistenceEngineFixture.cs
new file mode 100644
index 0000000..dfa6156
--- /dev/null
+++ b/src/NEventStore.Persistence.FirebirdSql.Tests/PersistenceEngineFixture.cs
@@ -0,0 +1,38 @@
+namespace NEventStore.Persistence.AcceptanceTests
+{
+
+ using NEventStore.Persistence.Sql;
+ using NEventStore.Persistence.Sql.SqlDialects;
+ using NEventStore.Persistence.Sql.Tests;
+ using NEventStore.Serialization;
+
+ public partial class PersistenceEngineFixture
+ {
+ const string ConnectionString = "User=SYSDBA;Password=doesntmatter;Database=neventstore.fdb;DataSource=localhost;Port=3050;Dialect=3;Charset=UTF8;Role=;Connection lifetime=15;Pooling=true;MinPoolSize=0;MaxPoolSize=50;Packet Size=8192;ServerType=1;";
+ private const string EnvVariable = "NEventStore.FirebirdSql";
+
+ public PersistenceEngineFixture()
+ {
+ _createPersistence = pageSize =>
+ new FirebirdSqlPersistenceFactory(
+ new EnviromentConnectionFactory("FirebirdSql", "FirebirdSql"),
+ new JsonSerializer(),
+ new FirebirdSqlDialect(),
+ pageSize: pageSize).Build();
+ }
+
+ //partial void PrepEnvironment()
+ //{
+ // Environment.SetEnvironmentVariable(EnvVariable, ConnectionString, EnvironmentVariableTarget.Process);
+ // FbConnection.CreateDatabase(ConnectionString, true);
+
+ //}
+
+ //partial void CleanEnvironment()
+ //{
+ // Environment.SetEnvironmentVariable(EnvVariable, null, EnvironmentVariableTarget.Process);
+ // FbConnection.ClearAllPools();
+ // FbConnection.DropDatabase(ConnectionString);
+ //}
+ }
+}
\ No newline at end of file
diff --git a/src/NEventStore.Persistence.FirebirdSql.Tests/Properties/ProjectAssemblyInfo.cs b/src/NEventStore.Persistence.FirebirdSql.Tests/Properties/ProjectAssemblyInfo.cs
new file mode 100644
index 0000000..71322e7
--- /dev/null
+++ b/src/NEventStore.Persistence.FirebirdSql.Tests/Properties/ProjectAssemblyInfo.cs
@@ -0,0 +1,4 @@
+using System.Reflection;
+
+[assembly: AssemblyTitle("NEventStore.Persistence.PostgreSql.Tests")]
+[assembly: AssemblyDescription("")]
\ No newline at end of file
diff --git a/src/NEventStore.Persistence.FirebirdSql.Tests/app.config b/src/NEventStore.Persistence.FirebirdSql.Tests/app.config
new file mode 100644
index 0000000..1ef5d39
--- /dev/null
+++ b/src/NEventStore.Persistence.FirebirdSql.Tests/app.config
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
diff --git a/src/NEventStore.Persistence.FirebirdSql.Tests/neventstore.fdb b/src/NEventStore.Persistence.FirebirdSql.Tests/neventstore.fdb
new file mode 100644
index 0000000..5167b26
Binary files /dev/null and b/src/NEventStore.Persistence.FirebirdSql.Tests/neventstore.fdb differ
diff --git a/src/NEventStore.Persistence.FirebirdSql.Tests/packages.config b/src/NEventStore.Persistence.FirebirdSql.Tests/packages.config
new file mode 100644
index 0000000..970d1ac
--- /dev/null
+++ b/src/NEventStore.Persistence.FirebirdSql.Tests/packages.config
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs b/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs
index c267373..609d34d 100644
--- a/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs
+++ b/src/NEventStore.Persistence.MySql.Tests/PersistenceEngineFixture.cs
@@ -1,12 +1,11 @@
-using NEventStore.Persistence.Sql.Tests;
-
-namespace NEventStore.Persistence.AcceptanceTests
+namespace NEventStore.Persistence.AcceptanceTests
{
- using NEventStore.Persistence.Sql;
- using NEventStore.Persistence.Sql.SqlDialects;
- using NEventStore.Serialization;
+ using NEventStore.Persistence.Sql;
+ using NEventStore.Persistence.Sql.SqlDialects;
+ using NEventStore.Persistence.Sql.Tests;
+ using NEventStore.Serialization;
- public partial class PersistenceEngineFixture
+ public partial class PersistenceEngineFixture
{
public PersistenceEngineFixture()
{
diff --git a/src/NEventStore.Persistence.Sql.sln b/src/NEventStore.Persistence.Sql.sln
index 4f9b77d..846edc3 100644
--- a/src/NEventStore.Persistence.Sql.sln
+++ b/src/NEventStore.Persistence.Sql.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2013
-VisualStudioVersion = 12.0.30110.0
+VisualStudioVersion = 12.0.31101.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{09DEED55-B936-4C29-AC45-6844080C025D}"
ProjectSection(SolutionItems) = preProject
@@ -29,6 +29,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.Sql
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.Sql.Tests", "NEventStore.Persistence.Sql.Tests\NEventStore.Persistence.Sql.Tests.csproj", "{D210B888-D588-4231-9DDB-CB01F57D35C5}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NEventStore.Persistence.FirebirdSql.Tests", "NEventStore.Persistence.FirebirdSql.Tests\NEventStore.Persistence.FirebirdSql.Tests.csproj", "{0429FD21-A8EF-4C73-967B-579A9C50CE2E}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -71,6 +73,10 @@ Global
{D210B888-D588-4231-9DDB-CB01F57D35C5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D210B888-D588-4231-9DDB-CB01F57D35C5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D210B888-D588-4231-9DDB-CB01F57D35C5}.Release|Any CPU.Build.0 = Release|Any CPU
+ {0429FD21-A8EF-4C73-967B-579A9C50CE2E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {0429FD21-A8EF-4C73-967B-579A9C50CE2E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {0429FD21-A8EF-4C73-967B-579A9C50CE2E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {0429FD21-A8EF-4C73-967B-579A9C50CE2E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/NEventStore.Persistence.Sql/FirebirdPersistenceEngine.cs b/src/NEventStore.Persistence.Sql/FirebirdPersistenceEngine.cs
new file mode 100644
index 0000000..bb3b352
--- /dev/null
+++ b/src/NEventStore.Persistence.Sql/FirebirdPersistenceEngine.cs
@@ -0,0 +1,551 @@
+namespace NEventStore.Persistence.Sql
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Data;
+ using System.Globalization;
+ using System.Text;
+ using System.Threading;
+ using System.Transactions;
+
+ using NEventStore.Logging;
+ using NEventStore.Serialization;
+
+ public class FirebirdPersistenceEngine : IPersistStreams
+ {
+ private static readonly ILog Logger = LogFactory.BuildLogger(typeof(SqlPersistenceEngine));
+ private static readonly DateTime EpochTime = new DateTime(1970, 1, 1);
+ private readonly IConnectionFactory _connectionFactory;
+ private readonly ISqlDialect _dialect;
+ private readonly int _pageSize;
+ private readonly TransactionScopeOption _scopeOption;
+ private readonly ISerialize _serializer;
+ private bool _disposed;
+ private int _initialized;
+ private readonly IStreamIdHasher _streamIdHasher;
+
+ public FirebirdPersistenceEngine(
+ IConnectionFactory connectionFactory,
+ ISqlDialect dialect,
+ ISerialize serializer,
+ TransactionScopeOption scopeOption,
+ int pageSize)
+ : this(connectionFactory, dialect, serializer, scopeOption, pageSize, new Sha1StreamIdHasher())
+ { }
+
+ public FirebirdPersistenceEngine(
+ IConnectionFactory connectionFactory,
+ ISqlDialect dialect,
+ ISerialize serializer,
+ TransactionScopeOption scopeOption,
+ int pageSize,
+ IStreamIdHasher streamIdHasher)
+ {
+ if (connectionFactory == null)
+ {
+ throw new ArgumentNullException("connectionFactory");
+ }
+
+ if (dialect == null)
+ {
+ throw new ArgumentNullException("dialect");
+ }
+
+ if (serializer == null)
+ {
+ throw new ArgumentNullException("serializer");
+ }
+
+ if (pageSize < 0)
+ {
+ throw new ArgumentException("pageSize");
+ }
+
+ if (streamIdHasher == null)
+ {
+ throw new ArgumentNullException("streamIdHasher");
+ }
+
+ _connectionFactory = connectionFactory;
+ _dialect = dialect;
+ _serializer = serializer;
+ _scopeOption = scopeOption;
+ _pageSize = pageSize;
+ _streamIdHasher = new StreamIdHasherValidator(streamIdHasher);
+
+ Logger.Debug(Messages.UsingScope, _scopeOption.ToString());
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public virtual void Initialize()
+ {
+ if (Interlocked.Increment(ref _initialized) > 1)
+ {
+ return;
+ }
+
+ Logger.Debug(Messages.InitializingStorage);
+
+ string[] statements = _dialect.InitializeStorage.Split(new[] { "__" }, StringSplitOptions.RemoveEmptyEntries);
+
+ StringBuilder builder = null;
+ bool buildingSetTermStatement = false;
+
+ foreach (string s in statements)
+ {
+ ExecuteCommand(statement => statement.ExecuteWithoutExceptions(s.Trim()));
+ }
+ }
+
+ public virtual IEnumerable GetFrom(string bucketId, string streamId, int minRevision, int maxRevision)
+ {
+ Logger.Debug(Messages.GettingAllCommitsBetween, streamId, minRevision, maxRevision);
+ streamId = _streamIdHasher.GetHash(streamId);
+ return ExecuteQuery(query =>
+ {
+ string statement = _dialect.GetCommitsFromStartingRevision;
+ query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ query.AddParameter(_dialect.StreamId, streamId, DbType.AnsiString);
+ query.AddParameter(_dialect.StreamRevision, minRevision);
+ query.AddParameter(_dialect.MaxStreamRevision, maxRevision);
+ query.AddParameter(_dialect.CommitSequence, 0);
+ return query
+ .ExecutePagedQuery(statement, _dialect.NextPageDelegate)
+ .Select(x => x.GetCommit(_serializer, _dialect));
+
+ /* return query
+ .ExecutePagedQuery(statement, (q, r) => {})
+ .Select(x => x.GetCommit(_serializer, _dialect));*/
+ });
+ }
+
+ public virtual IEnumerable GetFrom(string bucketId, DateTime start)
+ {
+ start = start.AddTicks(-(start.Ticks % TimeSpan.TicksPerSecond)); // Rounds down to the nearest second.
+ start = start < EpochTime ? EpochTime : start;
+
+ Logger.Debug(Messages.GettingAllCommitsFrom, start, bucketId);
+ return ExecuteQuery(query =>
+ {
+ string statement = _dialect.GetCommitsFromInstant;
+ query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ query.AddParameter(_dialect.CommitStamp, start);
+ return query.ExecutePagedQuery(statement, (q, r) => { })
+ .Select(x => x.GetCommit(_serializer, _dialect));
+
+ });
+ }
+
+ public ICheckpoint GetCheckpoint(string checkpointToken)
+ {
+ return string.IsNullOrWhiteSpace(checkpointToken) ? null : LongCheckpoint.Parse(checkpointToken);
+ }
+
+ public virtual IEnumerable GetFromTo(string bucketId, DateTime start, DateTime end)
+ {
+ start = start.AddTicks(-(start.Ticks % TimeSpan.TicksPerSecond)); // Rounds down to the nearest second.
+ start = start < EpochTime ? EpochTime : start;
+ end = end < EpochTime ? EpochTime : end;
+
+ Logger.Debug(Messages.GettingAllCommitsFromTo, start, end);
+ return ExecuteQuery(query =>
+ {
+ string statement = _dialect.GetCommitsFromToInstant;
+ query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ query.AddParameter(_dialect.CommitStampStart, start);
+ query.AddParameter(_dialect.CommitStampEnd, end);
+ return query.ExecutePagedQuery(statement, (q, r) => { })
+ .Select(x => x.GetCommit(_serializer, _dialect));
+ });
+ }
+
+ public virtual ICommit Commit(CommitAttempt attempt)
+ {
+ ICommit commit;
+ try
+ {
+ commit = PersistCommit(attempt);
+ Logger.Debug(Messages.CommitPersisted, attempt.CommitId);
+ }
+ catch (Exception e)
+ {
+ if (!(e is UniqueKeyViolationException))
+ {
+ throw;
+ }
+
+ if (DetectDuplicate(attempt))
+ {
+ Logger.Info(Messages.DuplicateCommit);
+ throw new DuplicateCommitException(e.Message, e);
+ }
+
+ Logger.Info(Messages.ConcurrentWriteDetected);
+ throw new ConcurrencyException(e.Message, e);
+ }
+ return commit;
+ }
+
+ public virtual IEnumerable GetUndispatchedCommits()
+ {
+ Logger.Debug(Messages.GettingUndispatchedCommits);
+ return
+ ExecuteQuery(query => query.ExecutePagedQuery(_dialect.GetUndispatchedCommits, (q, r) => { }))
+ .Select(x => x.GetCommit(_serializer, _dialect))
+ .ToArray(); // avoid paging
+ }
+
+ public virtual void MarkCommitAsDispatched(ICommit commit)
+ {
+ Logger.Debug(Messages.MarkingCommitAsDispatched, commit.CommitId);
+ string streamId = _streamIdHasher.GetHash(commit.StreamId);
+ ExecuteCommand(cmd =>
+ {
+ cmd.AddParameter(_dialect.BucketId, commit.BucketId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.StreamId, streamId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.CommitSequence, commit.CommitSequence);
+ return cmd.ExecuteWithoutExceptions(_dialect.MarkCommitAsDispatched);
+ });
+ }
+
+ public virtual IEnumerable GetStreamsToSnapshot(string bucketId, int maxThreshold)
+ {
+ Logger.Debug(Messages.GettingStreamsToSnapshot);
+ return ExecuteQuery(query =>
+ {
+ string statement = _dialect.GetStreamsRequiringSnapshots;
+ query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ query.AddParameter(_dialect.Threshold, maxThreshold);
+ return
+ query.ExecutePagedQuery(statement,
+ (q, s) => q.SetParameter(_dialect.StreamId, _dialect.CoalesceParameterValue(s.StreamId()), DbType.AnsiString))
+ .Select(x => x.GetStreamToSnapshot());
+ });
+ }
+
+ public virtual ISnapshot GetSnapshot(string bucketId, string streamId, int maxRevision)
+ {
+ Logger.Debug(Messages.GettingRevision, streamId, maxRevision);
+ var streamIdHash = _streamIdHasher.GetHash(streamId);
+ return ExecuteQuery(query =>
+ {
+ string statement = _dialect.GetSnapshot;
+ query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ query.AddParameter(_dialect.StreamId, streamIdHash, DbType.AnsiString);
+ query.AddParameter(_dialect.StreamRevision, maxRevision);
+ return query.ExecuteWithQuery(statement).Select(x => x.GetSnapshot(_serializer, streamId));
+ }).FirstOrDefault();
+ }
+
+ public virtual bool AddSnapshot(ISnapshot snapshot)
+ {
+ Logger.Debug(Messages.AddingSnapshot, snapshot.StreamId, snapshot.StreamRevision);
+ string streamId = _streamIdHasher.GetHash(snapshot.StreamId);
+ return ExecuteCommand((connection, cmd) =>
+ {
+ cmd.AddParameter(_dialect.BucketId, snapshot.BucketId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.StreamId, streamId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.StreamRevision, snapshot.StreamRevision);
+ _dialect.AddPayloadParamater(_connectionFactory, connection, cmd, _serializer.Serialize(snapshot.Payload));
+ return cmd.ExecuteWithoutExceptions(_dialect.AppendSnapshotToCommit);
+ }) > 0;
+ }
+
+ public virtual void Purge()
+ {
+ Logger.Warn(Messages.PurgingStorage);
+ foreach (var s in _dialect.PurgeStorage.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries))
+ {
+ ExecuteCommand(cmd => cmd.ExecuteNonQuery(s.Trim()));
+ }
+ }
+
+ public void Purge(string bucketId)
+ {
+ Logger.Warn(Messages.PurgingBucket, bucketId);
+ ExecuteCommand(cmd =>
+ {
+ cmd.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ return cmd.ExecuteNonQuery(_dialect.PurgeBucket);
+ });
+ }
+
+ public void Drop()
+ {
+ Logger.Warn(Messages.DroppingTables);
+
+ string[] tablesNames = _dialect.Drop.Split(new[] { ";" }, StringSplitOptions.RemoveEmptyEntries);
+
+ foreach (string name in tablesNames)
+ {
+ ExecuteCommand(cmd => cmd.ExecuteNonQuery(name.Trim() + ";"));
+ }
+ }
+
+ public void DeleteStream(string bucketId, string streamId)
+ {
+ Logger.Warn(Messages.DeletingStream, streamId, bucketId);
+ streamId = _streamIdHasher.GetHash(streamId);
+ ExecuteCommand(cmd =>
+ {
+ cmd.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.StreamId, streamId, DbType.AnsiString);
+ return cmd.ExecuteNonQuery(_dialect.DeleteStream);
+ });
+ }
+
+ public IEnumerable GetFrom(string bucketId, string checkpointToken)
+ {
+ LongCheckpoint checkpoint = LongCheckpoint.Parse(checkpointToken);
+ Logger.Debug(Messages.GettingAllCommitsFromBucketAndCheckpoint, bucketId, checkpointToken);
+ return ExecuteQuery(query =>
+ {
+ string statement = _dialect.GetCommitsFromBucketAndCheckpoint;
+ query.AddParameter(_dialect.BucketId, bucketId, DbType.AnsiString);
+ query.AddParameter(_dialect.CheckpointNumber, checkpoint.LongValue);
+ return query.ExecutePagedQuery(statement, (q, r) => { })
+ .Select(x => x.GetCommit(_serializer, _dialect));
+ });
+ }
+
+ public IEnumerable GetFrom(string checkpointToken)
+ {
+ LongCheckpoint checkpoint = LongCheckpoint.Parse(checkpointToken);
+ Logger.Debug(Messages.GettingAllCommitsFromCheckpoint, checkpointToken);
+ return ExecuteQuery(query =>
+ {
+ string statement = _dialect.GetCommitsFromCheckpoint;
+ query.AddParameter(_dialect.CheckpointNumber, checkpoint.LongValue);
+ return query.ExecutePagedQuery(statement, (q, r) => { })
+ .Select(x => x.GetCommit(_serializer, _dialect));
+ });
+ }
+
+ public bool IsDisposed
+ {
+ get { return _disposed; }
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposing || _disposed)
+ {
+ return;
+ }
+
+ Logger.Debug(Messages.ShuttingDownPersistence);
+ _disposed = true;
+ }
+
+ protected virtual void OnPersistCommit(IDbStatement cmd, CommitAttempt attempt)
+ { }
+
+ private ICommit PersistCommit(CommitAttempt attempt)
+ {
+ Logger.Debug(Messages.AttemptingToCommit, attempt.Events.Count, attempt.StreamId, attempt.CommitSequence, attempt.BucketId);
+ string streamId = _streamIdHasher.GetHash(attempt.StreamId);
+ return ExecuteCommand((connection, cmd) =>
+ {
+ cmd.AddParameter(_dialect.BucketId, attempt.BucketId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.StreamId, streamId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.StreamIdOriginal, attempt.StreamId);
+ cmd.AddParameter(_dialect.StreamRevision, attempt.StreamRevision);
+ cmd.AddParameter(_dialect.Items, attempt.Events.Count);
+ cmd.AddParameter(_dialect.CommitId, attempt.CommitId);
+ cmd.AddParameter(_dialect.CommitSequence, attempt.CommitSequence);
+ cmd.AddParameter(_dialect.CommitStamp, attempt.CommitStamp);
+ cmd.AddParameter(_dialect.Headers, _serializer.Serialize(attempt.Headers));
+ _dialect.AddPayloadParamater(_connectionFactory, connection, cmd, _serializer.Serialize(attempt.Events.ToList()));
+ OnPersistCommit(cmd, attempt);
+ var checkpointNumber = cmd.ExecuteScalar(_dialect.PersistCommit).ToLong();
+ return new Commit(
+ attempt.BucketId,
+ attempt.StreamId,
+ attempt.StreamRevision,
+ attempt.CommitId,
+ attempt.CommitSequence,
+ attempt.CommitStamp,
+ checkpointNumber.ToString(CultureInfo.InvariantCulture),
+ attempt.Headers,
+ attempt.Events);
+ });
+ }
+
+ private bool DetectDuplicate(CommitAttempt attempt)
+ {
+ string streamId = _streamIdHasher.GetHash(attempt.StreamId);
+ return ExecuteCommand(cmd =>
+ {
+ cmd.AddParameter(_dialect.BucketId, attempt.BucketId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.StreamId, streamId, DbType.AnsiString);
+ cmd.AddParameter(_dialect.CommitId, attempt.CommitId);
+ cmd.AddParameter(_dialect.CommitSequence, attempt.CommitSequence);
+ object value = cmd.ExecuteScalar(_dialect.DuplicateCommit);
+ return (value is long ? (long)value : (int)value) > 0;
+ });
+ }
+
+ protected virtual IEnumerable ExecuteQuery(Func> query)
+ {
+ ThrowWhenDisposed();
+
+ TransactionScope scope = OpenQueryScope();
+ IDbConnection connection = null;
+ IDbTransaction transaction = null;
+ IDbStatement statement = null;
+
+ try
+ {
+ connection = _connectionFactory.Open();
+ transaction = _dialect.OpenTransaction(connection);
+ statement = _dialect.BuildStatement(scope, connection, transaction);
+ statement.PageSize = _pageSize;
+
+ Logger.Verbose(Messages.ExecutingQuery);
+ return query(statement);
+ }
+ catch (Exception e)
+ {
+ if (statement != null)
+ {
+ statement.Dispose();
+ }
+ if (transaction != null)
+ {
+ transaction.Dispose();
+ }
+ if (connection != null)
+ {
+ connection.Dispose();
+ }
+ if (scope != null)
+ {
+ scope.Dispose();
+ }
+
+ Logger.Debug(Messages.StorageThrewException, e.GetType());
+ if (e is StorageUnavailableException)
+ {
+ throw;
+ }
+
+ throw new StorageException(e.Message, e);
+ }
+ }
+
+ protected virtual TransactionScope OpenQueryScope()
+ {
+ return OpenCommandScope() ?? new TransactionScope(TransactionScopeOption.Suppress);
+ }
+
+ private void ThrowWhenDisposed()
+ {
+ if (!_disposed)
+ {
+ return;
+ }
+
+ Logger.Warn(Messages.AlreadyDisposed);
+ throw new ObjectDisposedException(Messages.AlreadyDisposed);
+ }
+
+ private T ExecuteCommand(Func command)
+ {
+ return ExecuteCommand((_, statement) => command(statement));
+ }
+
+ protected virtual T ExecuteCommand(Func command)
+ {
+ ThrowWhenDisposed();
+
+ using (TransactionScope scope = OpenCommandScope())
+ using (IDbConnection connection = _connectionFactory.Open())
+ using (IDbTransaction transaction = _dialect.OpenTransaction(connection))
+ using (IDbStatement statement = _dialect.BuildStatement(scope, connection, transaction))
+ {
+ try
+ {
+ Logger.Verbose(Messages.ExecutingCommand);
+ T rowsAffected = command(connection, statement);
+ Logger.Verbose(Messages.CommandExecuted, rowsAffected);
+
+ if (transaction != null)
+ {
+ transaction.Commit();
+ }
+
+ if (scope != null)
+ {
+ scope.Complete();
+ }
+
+ return rowsAffected;
+ }
+ catch (Exception e)
+ {
+ Logger.Debug(Messages.StorageThrewException, e.GetType());
+ if (!RecoverableException(e))
+ {
+ throw new StorageException(e.Message, e);
+ }
+
+ Logger.Info(Messages.RecoverableExceptionCompletesScope);
+ if (scope != null)
+ {
+ scope.Complete();
+ }
+
+ throw;
+ }
+ }
+ }
+
+ protected virtual TransactionScope OpenCommandScope()
+ {
+ return new TransactionScope(_scopeOption);
+ }
+
+ private static bool RecoverableException(Exception e)
+ {
+ return e is UniqueKeyViolationException || e is StorageUnavailableException;
+ }
+
+ private class StreamIdHasherValidator : IStreamIdHasher
+ {
+ private readonly IStreamIdHasher _streamIdHasher;
+ private const int MaxStreamIdHashLength = 40;
+
+ public StreamIdHasherValidator(IStreamIdHasher streamIdHasher)
+ {
+ if (streamIdHasher == null)
+ {
+ throw new ArgumentNullException("streamIdHasher");
+ }
+ _streamIdHasher = streamIdHasher;
+ }
+ public string GetHash(string streamId)
+ {
+ if (string.IsNullOrWhiteSpace(streamId))
+ {
+ throw new ArgumentException(Messages.StreamIdIsNullEmptyOrWhiteSpace);
+ }
+ string streamIdHash = _streamIdHasher.GetHash(streamId);
+ if (string.IsNullOrWhiteSpace(streamIdHash))
+ {
+ throw new InvalidOperationException(Messages.StreamIdHashIsNullEmptyOrWhiteSpace);
+ }
+ if (streamIdHash.Length > MaxStreamIdHashLength)
+ {
+ throw new InvalidOperationException(Messages.StreamIdHashTooLong.FormatWith(streamId, streamIdHash, streamIdHash.Length, MaxStreamIdHashLength));
+ }
+ return streamIdHash;
+ }
+ }
+ }
+}
diff --git a/src/NEventStore.Persistence.Sql/FirebirdSqlPersistenceFactory.cs b/src/NEventStore.Persistence.Sql/FirebirdSqlPersistenceFactory.cs
new file mode 100644
index 0000000..d6fe9ff
--- /dev/null
+++ b/src/NEventStore.Persistence.Sql/FirebirdSqlPersistenceFactory.cs
@@ -0,0 +1,26 @@
+namespace NEventStore.Persistence.Sql
+{
+ using System.Transactions;
+
+ using NEventStore.Serialization;
+
+ public class FirebirdSqlPersistenceFactory : SqlPersistenceFactory
+ {
+ private readonly TransactionScopeOption _transactionScopeOption;
+
+ public FirebirdSqlPersistenceFactory(string connectionName, ISerialize serializer, ISqlDialect dialect = null)
+ : base(connectionName, serializer, dialect)
+ {}
+
+ public FirebirdSqlPersistenceFactory(IConnectionFactory factory, ISerialize serializer, ISqlDialect dialect, IStreamIdHasher streamIdHasher = null, TransactionScopeOption scopeOption = TransactionScopeOption.Suppress, int pageSize = 128)
+ : base(factory, serializer, dialect, streamIdHasher, scopeOption, pageSize)
+ {
+ this._transactionScopeOption = scopeOption;
+ }
+
+ public override IPersistStreams Build()
+ {
+ return new FirebirdPersistenceEngine(base.ConnectionFactory, base.Dialect, base.Serializer, _transactionScopeOption, base.PageSize, base.StreamIdHasher);
+ }
+ }
+}
diff --git a/src/NEventStore.Persistence.Sql/FirebirdSqlPersistenceWireup.cs b/src/NEventStore.Persistence.Sql/FirebirdSqlPersistenceWireup.cs
new file mode 100644
index 0000000..63031e1
--- /dev/null
+++ b/src/NEventStore.Persistence.Sql/FirebirdSqlPersistenceWireup.cs
@@ -0,0 +1,83 @@
+namespace NEventStore
+{
+ using System.Transactions;
+ using System;
+
+ using NEventStore.Logging;
+ using NEventStore.Persistence.Sql;
+ using NEventStore.Serialization;
+
+ ///
+ /// Class FirebirdSqlPersistenceWireup. Allows the usage of the FirebirdSqlPersistenceFactory which sends differente statements in different commands to the database.
+ /// This is due to a problem with the .NET Provider.
+ ///
+ public class FirebirdSqlPersistenceWireup: PersistenceWireup
+ {
+ private const int DefaultPageSize = 512;
+ private static readonly ILog Logger = LogFactory.BuildLogger(typeof(SqlPersistenceWireup));
+ private int _pageSize = DefaultPageSize;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The wireup.
+ /// The connection factory.
+ public FirebirdSqlPersistenceWireup(Wireup wireup, IConnectionFactory connectionFactory)
+ : base(wireup)
+ {
+ Container.Register(c => null); // auto-detect
+ Container.Register(c => new Sha1StreamIdHasher());
+
+ Container.Register(c => new FirebirdSqlPersistenceFactory(
+ connectionFactory,
+ c.Resolve(),
+ c.Resolve(),
+ c.Resolve(),
+ c.Resolve(),
+ _pageSize).Build());
+ }
+
+ ///
+ /// Withes the dialect.
+ ///
+ /// The instance.
+ /// FirebirdSqlPersistenceWireup.
+ public virtual FirebirdSqlPersistenceWireup WithDialect(ISqlDialect instance)
+ {
+ Container.Register(instance);
+ return this;
+ }
+
+ ///
+ /// Pages the every.
+ ///
+ /// The records.
+ /// FirebirdSqlPersistenceWireup.
+ public virtual FirebirdSqlPersistenceWireup PageEvery(int records)
+ {
+ _pageSize = records;
+ return this;
+ }
+
+ ///
+ /// Withes the stream identifier hasher.
+ ///
+ /// The instance.
+ /// FirebirdSqlPersistenceWireup.
+ public virtual FirebirdSqlPersistenceWireup WithStreamIdHasher(IStreamIdHasher instance)
+ {
+ Container.Register(instance);
+ return this;
+ }
+
+ ///
+ /// Withes the stream identifier hasher.
+ ///
+ /// The get stream identifier hash.
+ /// FirebirdSqlPersistenceWireup.
+ public virtual FirebirdSqlPersistenceWireup WithStreamIdHasher(Func getStreamIdHash)
+ {
+ return WithStreamIdHasher(new DelegateStreamIdHasher(getStreamIdHash));
+ }
+ }
+}
diff --git a/src/NEventStore.Persistence.Sql/FirebirdWireupExtension.cs b/src/NEventStore.Persistence.Sql/FirebirdWireupExtension.cs
new file mode 100644
index 0000000..8ebc208
--- /dev/null
+++ b/src/NEventStore.Persistence.Sql/FirebirdWireupExtension.cs
@@ -0,0 +1,31 @@
+namespace NEventStore
+{
+ using NEventStore.Persistence.Sql;
+
+ ///
+ /// Extensions which allows usign firebird-specific Persistence due to the existing problems with sending more than one statement in a single command.
+ ///
+ public static class FirebirdWireupExtension
+ {
+ ///
+ /// Extension method which allows using the FirebirdPersistenceEngine instead of SqlPersistenceEngine.
+ /// This Engine sends statements in different commands.
+ ///
+ /// The wireup.
+ /// Name of the connection.
+ /// if set to true [initialize].
+ /// FirebirdSqlPersistenceWireup.
+ public static FirebirdSqlPersistenceWireup UsingFirebirdPersistence(this Wireup wireup, string connectionName, bool initialize = false)
+ {
+ var factory = new ConfigurationConnectionFactory(connectionName);
+ FirebirdSqlPersistenceWireup wire = new FirebirdSqlPersistenceWireup(wireup, factory);
+
+ if (initialize)
+ {
+ wire.InitializeStorageEngine();
+ }
+
+ return wire;
+ }
+ }
+}
diff --git a/src/NEventStore.Persistence.Sql/NEventStore.Persistence.Sql.csproj b/src/NEventStore.Persistence.Sql/NEventStore.Persistence.Sql.csproj
index 3397f72..a75ebe4 100644
--- a/src/NEventStore.Persistence.Sql/NEventStore.Persistence.Sql.csproj
+++ b/src/NEventStore.Persistence.Sql/NEventStore.Persistence.Sql.csproj
@@ -46,6 +46,10 @@
+
+
+
+
@@ -66,6 +70,12 @@
CommonSqlStatements.resx
+
+
+ True
+ True
+ FirebirdSqlStatements.resx
+
True
@@ -129,6 +139,10 @@
ResXFileCodeGenerator
CommonSqlStatements.Designer.cs
+
+ ResXFileCodeGenerator
+ FirebirdSqlStatements1.Designer.cs
+
ResXFileCodeGenerator
MsSqlStatements.Designer.cs
diff --git a/src/NEventStore.Persistence.Sql/Sha1StreamIdHasher.cs b/src/NEventStore.Persistence.Sql/Sha1StreamIdHasher.cs
index ef4a7c4..93765e7 100644
--- a/src/NEventStore.Persistence.Sql/Sha1StreamIdHasher.cs
+++ b/src/NEventStore.Persistence.Sql/Sha1StreamIdHasher.cs
@@ -1,15 +1,15 @@
namespace NEventStore.Persistence.Sql
{
- using System;
- using System.Security.Cryptography;
- using System.Text;
+ using System;
+ using System.Security.Cryptography;
+ using System.Text;
- public class Sha1StreamIdHasher : IStreamIdHasher
- {
- public string GetHash(string streamId)
- {
- byte[] hashBytes = SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(streamId));
- return BitConverter.ToString(hashBytes).Replace("-", "");
- }
- }
+ public class Sha1StreamIdHasher : IStreamIdHasher
+ {
+ public string GetHash(string streamId)
+ {
+ byte[] hashBytes = SHA1.Create().ComputeHash(Encoding.UTF8.GetBytes(streamId));
+ return BitConverter.ToString(hashBytes).Replace("-", "");
+ }
+ }
}
\ No newline at end of file
diff --git a/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlDialect.cs b/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlDialect.cs
new file mode 100644
index 0000000..16e5cf7
--- /dev/null
+++ b/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlDialect.cs
@@ -0,0 +1,71 @@
+namespace NEventStore.Persistence.Sql.SqlDialects
+{
+ using System;
+
+ public class FirebirdSqlDialect : CommonSqlDialect
+ {
+ public override string InitializeStorage
+ {
+ get { return FirebirdSqlStatements.InitializeStorage; }
+ }
+
+ public override string PersistCommit
+ {
+ get { return FirebirdSqlStatements.PersistCommits; }
+ }
+
+ public override string GetUndispatchedCommits
+ {
+ get { return FirebirdSqlStatements.GetUndispatchedCommits; }
+ }
+
+ public override string GetCommitsFromInstant
+ {
+ get { return FirebirdSqlStatements.GetCommitsFromInstant; }
+ }
+
+ public override string GetCommitsFromToInstant
+ {
+ get { return FirebirdSqlStatements.GetCommitsFromToInstant; }
+ }
+
+ public override string GetCommitsFromStartingRevision
+ {
+ get { return FirebirdSqlStatements.GetCommitsFromStartingRevision; }
+ }
+
+ public override string GetCommitsFromBucketAndCheckpoint
+ {
+ get
+ {
+ return FirebirdSqlStatements.GetCommitsFromBucketAndCheckpoint;
+ }
+ }
+
+ public override string GetSnapshot
+ {
+ get { return FirebirdSqlStatements.GetSnapshot; }
+ }
+
+ public override string GetStreamsRequiringSnapshots
+ {
+ get { return FirebirdSqlStatements.GetStreamsRequiringSnapshots; }
+ }
+
+ public override string GetCommitsFromCheckpoint
+ {
+ get { return FirebirdSqlStatements.GetCommitsFromCheckpoint; }
+ }
+
+ public override bool IsDuplicate(Exception exception)
+ {
+ string message = exception.Message.ToUpperInvariant();
+ return message.Contains("DUPLICATE") || message.Contains("UNIQUE") || message.Contains("CONSTRAINT");
+ }
+
+ public override string AppendSnapshotToCommit
+ {
+ get { return FirebirdSqlStatements.AppendSnapshotToCommit; }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlStatements.resx b/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlStatements.resx
new file mode 100644
index 0000000..24d1774
--- /dev/null
+++ b/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlStatements.resx
@@ -0,0 +1,235 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ text/microsoft-resx
+
+
+ 2.0
+
+
+ System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089
+
+
+ System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089
+
+
+ CREATE TABLE Commits
+ (
+ BucketId varchar(40) NOT NULL,
+ StreamId char(40) NOT NULL,
+ StreamIdOriginal varchar (1000) NOT NULL,
+ StreamRevision int NOT NULL CHECK (StreamRevision > 0),
+ Items int NOT NULL CHECK (Items > 0),
+ CommitId char(16) character set octets NOT NULL,
+ CommitSequence int NOT NULL CHECK (CommitSequence > 0),
+ CommitStamp timestamp NOT NULL,
+ CheckpointNumber int PRIMARY KEY,
+ Dispatched char (1) DEFAULT '0' NOT NULL,
+ Headers blob,
+ Payload blob NOT NULL
+ );__
+CREATE GENERATOR gen_cp_CpNo;__
+SET GENERATOR gen_cp_CpNo TO 0;__
+CREATE TRIGGER TCheckPoint FOR Commits
+ACTIVE BEFORE INSERT POSITION 0
+AS
+BEGIN
+if (NEW.CheckpointNumber is NULL) then NEW.CheckpointNumber = GEN_ID(gen_cp_CpNo, 1);
+END;__
+CREATE UNIQUE INDEX IX_Commits_CommitSequence ON Commits (BucketId, StreamId, CommitSequence);__
+CREATE UNIQUE INDEX IX_Commits_CommitId ON Commits (BucketId, StreamId, CommitId);__
+CREATE UNIQUE INDEX IX_Commits_Revisions ON Commits (BucketId, StreamId, StreamRevision, Items);__
+CREATE INDEX IX_Commits_Dispatched ON Commits (Dispatched);__
+CREATE INDEX IX_Commits_Stamp ON Commits (CommitStamp);__
+CREATE TABLE Snapshots
+(
+ BucketId varchar(40) NOT NULL,
+ StreamId char(40) NOT NULL,
+ StreamRevision int NOT NULL CHECK (StreamRevision > 0),
+ Payload blob NOT NULL,
+ CONSTRAINT PK_Snapshots PRIMARY KEY (BucketId, StreamId, StreamRevision)
+);
+
+
+ INSERT
+ INTO Commits
+ ( BucketId, StreamId, StreamIdOriginal, CommitId, CommitSequence, StreamRevision, Items, CommitStamp, Headers, Payload )
+VALUES (@BucketId, @StreamId, @StreamIdOriginal, @CommitId, @CommitSequence, @StreamRevision, @Items, @CommitStamp, @Headers, @Payload)
+RETURNING CheckpointNumber;
+
+
+ SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ FROM Commits
+ WHERE BucketId = @BucketId AND CommitStamp >= @CommitStamp
+ ORDER BY CommitStamp, StreamId, CommitSequence;
+
+
+ SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ FROM Commits
+ WHERE BucketId = @BucketId
+ AND CommitStamp >= @CommitStampStart
+ AND CommitStamp < @CommitStampEnd
+ ORDER BY CommitStamp, StreamId, CommitSequence;
+
+
+ SELECT FIRST @Limit BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ FROM Commits
+ WHERE BucketId = @BucketId
+ AND StreamId = @StreamId
+ AND StreamRevision >= @StreamRevision
+ AND (StreamRevision - Items) < @MaxStreamRevision
+ AND CommitSequence > @CommitSequence
+ ORDER BY CommitSequence;
+
+
+ SELECT FIRST 1 *
+ FROM Snapshots
+ WHERE BucketId = @BucketId
+ AND StreamId = @StreamId
+ AND StreamRevision <= @StreamRevision
+ ORDER BY StreamRevision DESC;
+
+
+ SELECT FIRST @Limit C.BucketId, C.StreamId, C.StreamIdOriginal, MAX(C.StreamRevision) AS StreamRevision, MAX(COALESCE(S.StreamRevision, 0)) AS SnapshotRevision
+ FROM Commits AS C
+ LEFT OUTER JOIN Snapshots AS S
+ ON C.BucketId = @BucketId
+ AND C.StreamId = S.StreamId
+ AND C.StreamRevision >= S.StreamRevision
+ GROUP BY C.StreamId, C.BucketId, C.StreamIdOriginal
+HAVING MAX(C.StreamRevision) >= MAX(COALESCE(S.StreamRevision, 0)) + @Threshold
+ ORDER BY C.StreamId;
+
+
+ SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ FROM Commits
+ WHERE Dispatched = 0
+ ORDER BY CheckpointNumber;
+
+
+ SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+FROM Commits
+WHERE CheckpointNumber > @CheckpointNumber
+ORDER BY CheckpointNumber;
+
+
+ INSERT
+INTO Snapshots
+ ( BucketId, StreamId, StreamRevision, Payload )
+SELECT @BucketId, @StreamId, @StreamRevision, @Payload FROM rdb$database
+WHERE EXISTS ( SELECT * FROM Commits WHERE BucketId = @BucketId AND StreamId = @StreamId AND (StreamRevision - Items) <= @StreamRevision )
+AND NOT EXISTS ( SELECT * FROM Snapshots WHERE BucketId = @BucketId AND StreamId = @StreamId AND StreamRevision = @StreamRevision );
+
+
+ SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ FROM Commits
+ WHERE BucketId = @BucketId
+ AND CheckpointNumber > @CheckpointNumber
+ ORDER BY CheckpointNumber;
+
+
\ No newline at end of file
diff --git a/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlStatements1.Designer.cs b/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlStatements1.Designer.cs
new file mode 100644
index 0000000..8c86e2c
--- /dev/null
+++ b/src/NEventStore.Persistence.Sql/SqlDialects/FirebirdSqlStatements1.Designer.cs
@@ -0,0 +1,222 @@
+//------------------------------------------------------------------------------
+//
+// This code was generated by a tool.
+// Runtime Version:4.0.30319.42000
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+//
+//------------------------------------------------------------------------------
+
+namespace NEventStore.Persistence.Sql.SqlDialects {
+ using System;
+
+
+ ///
+ /// A strongly-typed resource class, for looking up localized strings, etc.
+ ///
+ // This class was auto-generated by the StronglyTypedResourceBuilder
+ // class via a tool like ResGen or Visual Studio.
+ // To add or remove a member, edit your .ResX file then rerun ResGen
+ // with the /str option, or rebuild your VS project.
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "4.0.0.0")]
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
+ [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()]
+ internal class FirebirdSqlStatements {
+
+ private static global::System.Resources.ResourceManager resourceMan;
+
+ private static global::System.Globalization.CultureInfo resourceCulture;
+
+ [global::System.Diagnostics.CodeAnalysis.SuppressMessageAttribute("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
+ internal FirebirdSqlStatements() {
+ }
+
+ ///
+ /// Returns the cached ResourceManager instance used by this class.
+ ///
+ [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)]
+ internal static global::System.Resources.ResourceManager ResourceManager {
+ get {
+ if (object.ReferenceEquals(resourceMan, null)) {
+ global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("NEventStore.Persistence.Sql.SqlDialects.FirebirdSqlStatements", typeof(FirebirdSqlStatements).Assembly);
+ resourceMan = temp;
+ }
+ return resourceMan;
+ }
+ }
+
+ ///
+ /// Overrides the current thread's CurrentUICulture property for all
+ /// resource lookups using this strongly typed resource class.
+ ///
+ [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)]
+ internal static global::System.Globalization.CultureInfo Culture {
+ get {
+ return resourceCulture;
+ }
+ set {
+ resourceCulture = value;
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to INSERT
+ ///INTO Snapshots
+ /// ( BucketId, StreamId, StreamRevision, Payload )
+ ///SELECT @BucketId, @StreamId, @StreamRevision, @Payload FROM rdb$database
+ ///WHERE EXISTS ( SELECT * FROM Commits WHERE BucketId = @BucketId AND StreamId = @StreamId AND (StreamRevision - Items) <= @StreamRevision )
+ ///AND NOT EXISTS ( SELECT * FROM Snapshots WHERE BucketId = @BucketId AND StreamId = @StreamId AND StreamRevision = @StreamRevision );.
+ ///
+ internal static string AppendSnapshotToCommit {
+ get {
+ return ResourceManager.GetString("AppendSnapshotToCommit", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ /// FROM Commits
+ /// WHERE BucketId = @BucketId
+ /// AND CheckpointNumber > @CheckpointNumber
+ /// ORDER BY CheckpointNumber;.
+ ///
+ internal static string GetCommitsFromBucketAndCheckpoint {
+ get {
+ return ResourceManager.GetString("GetCommitsFromBucketAndCheckpoint", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ ///FROM Commits
+ ///WHERE CheckpointNumber > @CheckpointNumber
+ ///ORDER BY CheckpointNumber;.
+ ///
+ internal static string GetCommitsFromCheckpoint {
+ get {
+ return ResourceManager.GetString("GetCommitsFromCheckpoint", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ /// FROM Commits
+ /// WHERE BucketId = @BucketId AND CommitStamp >= @CommitStamp
+ /// ORDER BY CommitStamp, StreamId, CommitSequence;.
+ ///
+ internal static string GetCommitsFromInstant {
+ get {
+ return ResourceManager.GetString("GetCommitsFromInstant", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST @Limit BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ /// FROM Commits
+ /// WHERE BucketId = @BucketId
+ /// AND StreamId = @StreamId
+ /// AND StreamRevision >= @StreamRevision
+ /// AND (StreamRevision - Items) < @MaxStreamRevision
+ /// AND CommitSequence > @CommitSequence
+ /// ORDER BY CommitSequence;.
+ ///
+ internal static string GetCommitsFromStartingRevision {
+ get {
+ return ResourceManager.GetString("GetCommitsFromStartingRevision", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ /// FROM Commits
+ /// WHERE BucketId = @BucketId
+ /// AND CommitStamp >= @CommitStampStart
+ /// AND CommitStamp < @CommitStampEnd
+ /// ORDER BY CommitStamp, StreamId, CommitSequence;.
+ ///
+ internal static string GetCommitsFromToInstant {
+ get {
+ return ResourceManager.GetString("GetCommitsFromToInstant", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST 1 *
+ /// FROM Snapshots
+ /// WHERE BucketId = @BucketId
+ /// AND StreamId = @StreamId
+ /// AND StreamRevision <= @StreamRevision
+ /// ORDER BY StreamRevision DESC;.
+ ///
+ internal static string GetSnapshot {
+ get {
+ return ResourceManager.GetString("GetSnapshot", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST @Limit C.BucketId, C.StreamId, C.StreamIdOriginal, MAX(C.StreamRevision) AS StreamRevision, MAX(COALESCE(S.StreamRevision, 0)) AS SnapshotRevision
+ /// FROM Commits AS C
+ /// LEFT OUTER JOIN Snapshots AS S
+ /// ON C.BucketId = @BucketId
+ /// AND C.StreamId = S.StreamId
+ /// AND C.StreamRevision >= S.StreamRevision
+ /// GROUP BY C.StreamId, C.BucketId, C.StreamIdOriginal
+ ///HAVING MAX(C.StreamRevision) >= MAX(COALESCE(S.StreamRevision, 0)) + @Threshold
+ /// ORDER BY C.StreamId;.
+ ///
+ internal static string GetStreamsRequiringSnapshots {
+ get {
+ return ResourceManager.GetString("GetStreamsRequiringSnapshots", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to SELECT FIRST @Limit SKIP @Skip BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
+ /// FROM Commits
+ /// WHERE Dispatched = 0
+ /// ORDER BY CheckpointNumber;.
+ ///
+ internal static string GetUndispatchedCommits {
+ get {
+ return ResourceManager.GetString("GetUndispatchedCommits", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to CREATE TABLE Commits
+ /// (
+ /// BucketId varchar(40) NOT NULL,
+ /// StreamId char(40) NOT NULL,
+ /// StreamIdOriginal varchar (1000) NOT NULL,
+ /// StreamRevision int NOT NULL CHECK (StreamRevision > 0),
+ /// Items int NOT NULL CHECK (Items > 0),
+ /// CommitId char(16) character set octets NOT NULL,
+ /// CommitSequence int NOT NULL CHECK (CommitSequence > 0),
+ /// CommitStamp timestamp NOT NULL,
+ /// CheckpointNumber int PRIMARY KEY,
+ /// Dispatched char (1) DEFAULT '0' NOT NULL,
+ /// Headers blob,
+ /// Paylo [rest of string was truncated]";.
+ ///
+ internal static string InitializeStorage {
+ get {
+ return ResourceManager.GetString("InitializeStorage", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to INSERT
+ /// INTO Commits
+ /// ( BucketId, StreamId, StreamIdOriginal, CommitId, CommitSequence, StreamRevision, Items, CommitStamp, Headers, Payload )
+ ///VALUES (@BucketId, @StreamId, @StreamIdOriginal, @CommitId, @CommitSequence, @StreamRevision, @Items, @CommitStamp, @Headers, @Payload)
+ ///RETURNING CheckpointNumber;.
+ ///
+ internal static string PersistCommits {
+ get {
+ return ResourceManager.GetString("PersistCommits", resourceCulture);
+ }
+ }
+ }
+}